Skip to content

Commit

Permalink
update how we handle fencing restore instances (#419)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhudson authored Dec 14, 2023
1 parent f78529d commit 1cfb78b
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 41 deletions.
2 changes: 1 addition & 1 deletion tembo-operator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tembo-operator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "controller"
description = "Tembo Operator for Postgres"
version = "0.26.1"
version = "0.27.0"
edition = "2021"
default-run = "controller"
license = "Apache-2.0"
Expand Down
65 changes: 44 additions & 21 deletions tembo-operator/src/cloudnativepg/cnpg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ fn cluster_managed(name: &str) -> Option<ClusterManaged> {

// This is a synchronous function that takes the latest_generated_node and diff_instances
// and returns a Vec<String> containing the names of the pods to be fenced.
#[instrument(fields(trace_id))]
fn calculate_pods_to_fence(latest_generated_node: i32, diff_instances: i32, base_name: &str) -> Vec<String> {
let mut pod_names_to_fence = Vec::new();
for i in 1..=diff_instances {
Expand All @@ -718,21 +719,45 @@ fn calculate_pods_to_fence(latest_generated_node: i32, diff_instances: i32, base
}

// This is a synchronous function to extend pod_names_to_fence with fenced_pods.
#[instrument(fields(trace_id))]
fn extend_with_fenced_pods(pod_names_to_fence: &mut Vec<String>, fenced_pods: Option<Vec<String>>) {
if let Some(fenced_pods) = fenced_pods {
pod_names_to_fence.extend(fenced_pods);
}
}

// pods_to_fence determines a list of pod names that should be fenced when we detect that new replicas are being created
#[instrument(skip(cdb, ctx), fields(trace_id, instance_name = %cdb.name_any()))]
async fn pods_to_fence(cdb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<String>, Action> {
// Check if there is an initial backup running, pending or completed. We
// should never fence a pod with an active initial backup running, pending or
// completed. There could be a time where we go into a reconcile loop
// during a restore, where the first_recoverability_time is not set and a backup
// is running. We need to exit early in that case and not fence the running pod.
if cdb.spec.restore.is_some()
&& is_restore_backup_running_pending_completed(cdb, ctx.clone())
.await
.unwrap_or(false)
{
debug!(
"Running or pending backup detected for instance {}, skipping fencing of pods.",
&cdb.name_any()
);
return Ok(Vec::new());
}

// Check if a restore is requested
if cdb.spec.restore.is_some()
&& cdb
.status
.as_ref()
.and_then(|s| s.first_recoverability_time.as_ref())
.is_none()
&& cdb
.status
.as_ref()
.and_then(|s| s.last_fully_reconciled_at.as_ref())
.is_none()
{
// If restore is requested, fence all the pods based on the cdb.spec.replicas value
let mut pod_names_to_fence = Vec::new();
Expand Down Expand Up @@ -1713,16 +1738,13 @@ fn generate_s3_restore_credentials(
}
}

fn is_backup_completed(backup: &Backup) -> bool {
match &backup.status {
Some(status) => status.phase.as_deref() == Some("completed"),
None => false,
}
}

// Lookup the Backup status of the instance we are deploying. If the backup isn't
// complete, we will requeue until it is.
pub async fn check_backups_status(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(), Action> {
// is_restore_backup_running_pending_completed checks if a backup is running or
// pending or completed and returns a bool or action in a result
#[instrument(skip(cdb, ctx), fields(trace_id, instance_name = %cdb.name_any()))]
async fn is_restore_backup_running_pending_completed(
cdb: &CoreDB,
ctx: Arc<Context>,
) -> Result<bool, Action> {
let instance_name = cdb.name_any();
let namespace = cdb.namespace().ok_or_else(|| {
error!("Namespace is not set for CoreDB for instance {}", instance_name);
Expand All @@ -1736,21 +1758,22 @@ pub async fn check_backups_status(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(),

match backup_result {
Ok(backup_list) => {
if backup_list.items.is_empty() {
error!("No backups found for {}, requeuing", instance_name);
return Err(Action::requeue(Duration::from_secs(30)));
}

for backup_item in backup_list.items {
if let Some(backup_name) = &backup_item.metadata.name {
if !is_backup_completed(&backup_item) {
info!("Backup {} is not completed, requeuing", backup_name);
return Err(Action::requeue(Duration::from_secs(30)));
if let Some(status) = &backup_item.status {
if status.phase.as_deref() == Some("running")
|| status.phase.as_deref() == Some("pending")
|| status.phase.as_deref() == Some("completed")
{
debug!(
"Backup for instance {} is in a {:?} state",
instance_name,
status.phase.as_deref()
);
return Ok(true);
}
info!("Backup {} completed", backup_name);
}
}
Ok(())
Ok(false)
}
Err(e) => {
error!("Error listing backups: {}", e);
Expand Down
18 changes: 1 addition & 17 deletions tembo-operator/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ use crate::{
app_service::manager::reconcile_app_services,
cloudnativepg::{
backups::Backup,
cnpg::{
check_backups_status, cnpg_cluster_from_cdb, reconcile_cnpg, reconcile_cnpg_scheduled_backup,
reconcile_pooler,
},
cnpg::{cnpg_cluster_from_cdb, reconcile_cnpg, reconcile_cnpg_scheduled_backup, reconcile_pooler},
},
config::Config,
deployment_postgres_exporter::reconcile_prometheus_exporter_deployment,
Expand Down Expand Up @@ -312,19 +309,6 @@ impl CoreDB {
let (trunk_installs, extensions) =
reconcile_extensions(self, ctx.clone(), &coredbs, &name).await?;

// Make sure the initial backup has completed if enabled before finishing
// reconciliation of the CoreDB resource that is being restored
if cfg.enable_backup
&& self.spec.restore.is_some()
&& self
.status
.as_ref()
.and_then(|s| s.first_recoverability_time)
.is_none()
{
check_backups_status(self, ctx.clone()).await?;
}

let recovery_time = self.get_recovery_time(ctx.clone()).await?;

let current_config_values = get_current_config_values(self, ctx.clone()).await?;
Expand Down
54 changes: 53 additions & 1 deletion tembo-operator/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,53 @@ mod test {
}
}

// function to check coredb.status.trunk_installs status of specific extension
async fn trunk_install_status(coredbs: &Api<CoreDB>, name: &str, extension: &str) -> bool {
let max_retries = 10;
let wait_duration = Duration::from_secs(2); // Adjust as needed

for attempt in 1..=max_retries {
match coredbs.get(name).await {
Ok(coredb) => {
let has_extension_without_error = coredb.status.as_ref().map_or(false, |s| {
s.trunk_installs.as_ref().map_or(false, |installs| {
installs
.iter()
.any(|install| install.name == extension && !install.error)
})
});

if has_extension_without_error {
println!(
"CoreDB {} has trunk_install status for {} without error",
name, extension
);
return true;
} else {
println!(
"Attempt {}/{}: CoreDB {} does not have trunk_install status for {} or has an error",
attempt, max_retries, name, extension
);
}
}
Err(e) => {
println!(
"Failed to get CoreDB on attempt {}/{}: {}",
attempt, max_retries, e
);
}
}

tokio::time::sleep(wait_duration).await;
}

println!(
"CoreDB {} did not have trunk_install status for {} without error after {} attempts",
name, extension, max_retries
);
false
}

#[tokio::test]
#[ignore]
async fn functional_test_basic_cnpg() {
Expand Down Expand Up @@ -4247,6 +4294,9 @@ CREATE EVENT TRIGGER pgrst_watch
panic!("Failed to retrieve pods: {:?}", e);
}
};

// Wait for pgmq to be installed before proceeding.
trunk_install_status(&restore_coredbs, restore_name, "pgmq").await;
for pod in &retrieved_pods {
let cmd = vec![
"/bin/sh".to_owned(),
Expand All @@ -4257,11 +4307,13 @@ CREATE EVENT TRIGGER pgrst_watch
pod_ready_and_running(restore_pods.clone(), pod_name.clone()).await;
let result = run_command_in_container(
restore_pods.clone(),
pod_name,
pod_name.clone(),
cmd.clone(),
Some("postgres".to_string()),
)
.await;
println!("Pod: {}", pod_name.clone());
println!("Result: {:?}", result);
assert!(result.contains("pgmq.control"));
}

Expand Down

0 comments on commit 1cfb78b

Please sign in to comment.