diff --git a/tembo-operator/src/controller.rs b/tembo-operator/src/controller.rs index bdb737ed8..63eefd2f2 100644 --- a/tembo-operator/src/controller.rs +++ b/tembo-operator/src/controller.rs @@ -41,7 +41,7 @@ use crate::{ extensions::{database_queries::list_config_params, reconcile_extensions}, ingress::{reconcile_extra_postgres_ing_route_tcp, reconcile_ip_allowlist_middleware}, network_policies::reconcile_network_policies, - postgres_exporter::reconcile_prom_configmap, + postgres_exporter::reconcile_metrics_configmap, trunk::{extensions_that_require_load, reconcile_trunk_configmap}, }; use rand::Rng; @@ -214,16 +214,15 @@ impl CoreDB { reconcile_app_services(self, ctx.clone()).await?; - if self.spec.postgresExporterEnabled - && self - .spec - .metrics - .as_ref() - .and_then(|m| m.queries.as_ref()) - .is_some() + if self + .spec + .metrics + .as_ref() + .and_then(|m| m.queries.as_ref()) + .is_some() { debug!("Reconciling prometheus configmap"); - reconcile_prom_configmap(self, client.clone(), &ns) + reconcile_metrics_configmap(self, client.clone(), &ns) .await .map_err(|e| { error!("Error reconciling prometheus configmap: {:?}", e); @@ -238,21 +237,6 @@ impl CoreDB { Action::requeue(Duration::from_secs(300)) })?; - // Postgres exporter connection info - if self.spec.postgresExporterEnabled { - let _ = reconcile_postgres_role_secret( - self, - ctx.clone(), - "postgres_exporter", - &format!("{}-exporter", name.clone()), - ) - .await - .map_err(|e| { - error!("Error reconciling postgres exporter secret: {:?}", e); - Action::requeue(Duration::from_secs(300)) - })?; - } - let _ = reconcile_postgres_role_secret(self, ctx.clone(), "readonly", &format!("{}-ro", name.clone())) .await diff --git a/tembo-operator/src/postgres_exporter.rs b/tembo-operator/src/postgres_exporter.rs index 7fcbafee9..97c4be649 100644 --- a/tembo-operator/src/postgres_exporter.rs +++ b/tembo-operator/src/postgres_exporter.rs @@ -174,7 +174,7 @@ impl FromStr for Usage { } } -pub async fn reconcile_prom_configmap(cdb: &CoreDB, client: Client, ns: &str) -> Result<(), Error> { +pub async fn reconcile_metrics_configmap(cdb: &CoreDB, client: Client, ns: &str) -> Result<(), Error> { // set custom pg-prom metrics in configmap values if they are specified let coredb_name = cdb .metadata diff --git a/tembo-operator/tests/integration_tests.rs b/tembo-operator/tests/integration_tests.rs index 66db4c164..1ea4079d9 100644 --- a/tembo-operator/tests/integration_tests.rs +++ b/tembo-operator/tests/integration_tests.rs @@ -30,18 +30,13 @@ mod test { use k8s_openapi::{ api::{ apps::v1::Deployment, - core::v1::{ - Container, Namespace, PersistentVolumeClaim, Pod, PodSpec, ResourceRequirements, Secret, - Service, - }, + core::v1::{Namespace, PersistentVolumeClaim, Pod, ResourceRequirements, Secret, Service}, }, apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition, - apimachinery::pkg::{api::resource::Quantity, apis::meta::v1::ObjectMeta, util::intstr::IntOrString}, + apimachinery::pkg::{api::resource::Quantity, util::intstr::IntOrString}, }; use kube::{ - api::{ - AttachParams, DeleteParams, ListParams, Patch, PatchParams, PostParams, WatchEvent, WatchParams, - }, + api::{AttachParams, DeleteParams, ListParams, Patch, PatchParams, WatchEvent, WatchParams}, runtime::wait::{await_condition, conditions, Condition}, Api, Client, Config, Error, }; @@ -62,7 +57,6 @@ mod test { // Timeout settings while waiting for an event const TIMEOUT_SECONDS_START_POD: u64 = 600; const TIMEOUT_SECONDS_POD_READY: u64 = 600; - const TIMEOUT_SECONDS_SECRET_PRESENT: u64 = 120; const TIMEOUT_SECONDS_NS_DELETED: u64 = 300; const TIMEOUT_SECONDS_POD_DELETED: u64 = 300; const TIMEOUT_SECONDS_COREDB_DELETED: u64 = 300; @@ -110,45 +104,6 @@ mod test { client } - fn wait_for_secret() -> impl Condition { - |obj: Option<&Secret>| { - if let Some(secret) = &obj { - if let Some(t) = &secret.type_ { - return t == "Opaque"; - } - } - false - } - } - - async fn create_test_buddy(pods_api: Api, name: String) -> String { - // Launch a pod we can connect to if we want to - // run commands inside the cluster. - let test_pod_name = format!("test-buddy-{}", name); - let pod = Pod { - metadata: ObjectMeta { - name: Some(test_pod_name.clone()), - ..ObjectMeta::default() - }, - spec: Some(PodSpec { - containers: vec![Container { - command: Some(vec!["sleep".to_string()]), - args: Some(vec!["1200".to_string()]), - name: "test-connection".to_string(), - image: Some("curlimages/curl:latest".to_string()), - ..Container::default() - }], - restart_policy: Some("Never".to_string()), - ..PodSpec::default() - }), - ..Pod::default() - }; - - let _pod = pods_api.create(&PostParams::default(), &pod).await.unwrap(); - - test_pod_name - } - async fn run_command_in_container( pods_api: Api, pod_name: String, @@ -582,6 +537,40 @@ mod test { false } + // Function to wait for metrics to appear + async fn wait_for_metric(pods: Api, pod_name: String, metric_name: &str) -> Result { + let max_retries = 15; // Adjust as needed + let wait_duration = Duration::from_secs(2); + + for attempt in 1..=max_retries { + let command = vec![String::from("curl"), "http://localhost:9187/metrics".to_string()]; + let result_stdout = run_command_in_container( + pods.clone(), + pod_name.clone(), + command, + Some("postgres".to_string()), + ) + .await; + + // Check if the result contains the expected metric + if result_stdout.contains(metric_name) { + return Ok(result_stdout); + } + + println!( + "Attempt {}/{}: Metric '{}' not found in output.", + attempt, max_retries, metric_name + ); + + tokio::time::sleep(wait_duration).await; + } + + Err(format!( + "Metric '{}' not found after {} attempts", + metric_name, max_retries + )) + } + #[tokio::test] #[ignore] async fn functional_test_basic_cnpg() { @@ -646,15 +635,6 @@ mod test { pod_ready_and_running(pods.clone(), pod_name.clone()).await; - let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; - let _ = wait_until_psql_contains( context.clone(), coredb_resource.clone(), @@ -664,37 +644,8 @@ mod test { ) .await; - let coredb_resource = coredbs.get(name).await.unwrap(); - let mut found_extension = false; - let mut retries = 0; - - while retries < 10 { - let status = &coredb_resource.status; - - if let Some(ref status) = status { - if let Some(ref extensions) = status.extensions { - for extension in extensions { - for location in &extension.locations { - if extension.name == "pg_jsonschema" && location.enabled.unwrap_or_default() { - found_extension = true; - assert_eq!(location.database, "postgres"); - assert_eq!( - location.schema.clone().unwrap_or_else(|| "public".to_string()), - "public" - ); - } - } - } - if found_extension { - break; - } - } - } - - // Sleep for a short duration before the next retry - tokio::time::sleep(Duration::from_secs(2)).await; - retries += 1; - } + // Wait for pg_jsonschema to be installed before proceeding. + let found_extension = trunk_install_status(&coredbs, name, "pg_jsonschema").await; assert!(found_extension); // Check for heartbeat table and values @@ -753,7 +704,6 @@ mod test { // Create a pod we can use to run commands in the cluster let pods: Api = Api::namespaced(client.clone(), &namespace); - let test_pod_name = create_test_buddy(pods.clone(), name.to_string()).await; // Apply a basic configuration of CoreDB println!("Creating CoreDB resource {}", name); @@ -812,60 +762,12 @@ mod test { let patch = Patch::Apply(&coredb_json); let coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap(); - // Wait for secret to be created - let secret_api: Api = Api::namespaced(client.clone(), &namespace); - let secret_name = format!("{}-exporter", name); - println!("Waiting for secret to be created: {}", secret_name); - let establish = await_condition(secret_api.clone(), &secret_name, wait_for_secret()); - let _ = tokio::time::timeout(Duration::from_secs(TIMEOUT_SECONDS_SECRET_PRESENT), establish) - .await - .unwrap_or_else(|_| { - panic!( - "Did not find the secret {} present after waiting {} seconds", - secret_name, TIMEOUT_SECONDS_SECRET_PRESENT - ) - }); - println!("Found secret: {}", secret_name); - - // assert for postgres-exporter secret to be created - let exporter_name = format!("{}-metrics", name); - let exporter_secret_name = format!("{}-exporter", name); - let exporter_secret = secret_api.get(&exporter_secret_name).await; - match exporter_secret { - Ok(secret) => { - // assert for non-empty data in the secret - assert!( - secret.data.map_or(false, |data| !data.is_empty()), - "postgres-exporter secret is empty!" - ); - } - Err(e) => panic!("Error getting postgres-exporter secret: {}", e), - } - // Wait for Pod to be created // This is the CNPG pod let pod_name = format!("{}-1", name); pod_ready_and_running(pods.clone(), pod_name.clone()).await; - let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; - - // assert that the postgres-exporter deployment was created - let deploy_api: Api = Api::namespaced(client.clone(), &namespace); - let exporter_deployment = deploy_api.get(exporter_name.clone().as_str()).await; - assert!( - exporter_deployment.is_ok(), - "postgres-exporter Deployment does not exist: {:?}", - exporter_deployment.err() - ); - // Assert default storage values are applied to PVC let pvc_api: Api = Api::namespaced(client.clone(), &namespace); let default_storage: Quantity = default_storage(); @@ -921,44 +823,26 @@ mod test { println!("{}", result.stdout.clone().unwrap()); assert!(result.stdout.clone().unwrap().contains("aggs_for_vecs")); - // Assert role 'postgres_exporter' was created - let result = psql_with_retry( - context.clone(), - coredb_resource.clone(), - "SELECT rolname FROM pg_roles;".to_string(), - ) - .await; - assert!( - result.stdout.clone().unwrap().contains("postgres_exporter"), - "results must contain postgres_exporter: {}", - result.stdout.clone().unwrap() - ); + // Check for metrics and availability + let metric_name = format!("cnpg_collector_up{{cluster=\"{}\"}} 1", name); + match wait_for_metric(pods.clone(), pod_name.to_string(), &metric_name).await { + Ok(result_stdout) => { + println!("Metric found: {}", result_stdout); + } + Err(e) => { + panic!("Failed to find metric: {}", e); + } + } - // Assert we can curl the metrics from the service - let metrics_service_name = format!("{}-metrics", name); - let command = vec![ - String::from("curl"), - format!("http://{metrics_service_name}/metrics"), - ]; - let result_stdout = - run_command_in_container(pods.clone(), test_pod_name.clone(), command, None).await; - assert!(result_stdout.contains("pg_up 1")); - println!("Found metrics when curling the metrics service"); - - // assert custom queries made it to metric server - let c = vec![ - "wget".to_owned(), - "-qO-".to_owned(), - "http://localhost:9187/metrics".to_owned(), - ]; - let result_stdout = run_command_in_container( - pods.clone(), - exporter_pod_name.to_string(), - c, - Some("postgres-exporter".to_string()), - ) - .await; - assert!(result_stdout.contains(&test_metric_decr)); + // Look for the custom metric + match wait_for_metric(pods.clone(), pod_name.to_string(), &test_metric_decr).await { + Ok(result_stdout) => { + println!("Metric found: {}", result_stdout); + } + Err(e) => { + panic!("Failed to find metric: {}", e); + } + } // Assert we can drop an extension after its been created let coredb_json = serde_json::json!({ @@ -1036,14 +920,6 @@ mod test { let s = storage.get("storage").unwrap().to_owned(); assert_eq!(Quantity("10Gi".to_owned()), s); - // Cleanup test buddy pod resource - pods.delete(&test_pod_name, &Default::default()).await.unwrap(); - println!("Waiting for test buddy pod to be deleted: {}", &test_pod_name); - let _assert_test_buddy_pod_deleted = tokio::time::timeout( - Duration::from_secs(TIMEOUT_SECONDS_POD_DELETED), - await_condition(pods.clone(), &test_pod_name, conditions::is_deleted("")), - ); - // Cleanup CoreDB resource coredbs.delete(name, &Default::default()).await.unwrap(); println!("Waiting for CoreDB to be deleted: {}", &name); @@ -2000,13 +1876,6 @@ mod test { pod_ready_and_running(pods.clone(), pod_name_primary.clone()).await; let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; // Assert that we can query the database with \dx; let result = psql_with_retry(context.clone(), coredb_resource.clone(), "\\dx".to_string()).await; @@ -2145,9 +2014,6 @@ mod test { let kind = "CoreDB"; let replicas = 1; - // Create a pod we can use to run commands in the cluster - let pods: Api = Api::namespaced(client.clone(), &namespace); - // Apply a basic configuration of CoreDB println!("Creating CoreDB resource {}", name); let coredbs: Api = Api::namespaced(client.clone(), &namespace); @@ -2193,19 +2059,10 @@ mod test { let coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap(); // Wait for CNPG Pod to be created + let pods: Api = Api::namespaced(client.clone(), &namespace); let pod_name = format!("{}-1", name); - pod_ready_and_running(pods.clone(), pod_name.clone()).await; - let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; - wait_until_psql_contains( context.clone(), coredb_resource.clone(), @@ -2288,9 +2145,6 @@ mod test { let kind = "CoreDB"; let replicas = 2; - // Create a pod we can use to run commands in the cluster - let pods: Api = Api::namespaced(client.clone(), &namespace); - // Apply a basic configuration of CoreDB println!("Creating CoreDB resource {}", name); let coredbs: Api = Api::namespaced(client.clone(), &namespace); @@ -2308,17 +2162,7 @@ mod test { let params = PatchParams::apply("tembo-integration-test"); let patch = Patch::Apply(&coredb_json); let coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap(); - - // Wait for CNPG Pod to be created - let pod_name_primary = format!("{}-1", name); - pod_ready_and_running(pods.clone(), pod_name_primary.clone()).await; - let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); // Wait for CNPG Cluster to be created by looping over replicas until // they are in a running state @@ -2331,6 +2175,11 @@ mod test { let result = psql_with_retry(context.clone(), coredb_resource.clone(), "\\dx".to_string()).await; assert!(result.stdout.clone().unwrap().contains("plpgsql")); + for i in 1..=replicas { + let pod_name = format!("{}-{}", name, i); + pod_ready_and_running(pods.clone(), pod_name).await; + } + // Assert that both pods are replicating successfully let result = psql_with_retry( context.clone(), @@ -2450,8 +2299,6 @@ mod test { // Create a pod we can use to run commands in the cluster let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); // Apply a basic configuration of CoreDB println!("Creating CoreDB resource {}", name); @@ -2477,9 +2324,6 @@ mod test { let pod_name = format!("{}-{}", name, i); pod_ready_and_running(pods.clone(), pod_name).await; } - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; let _result = wait_until_psql_contains( context.clone(), @@ -2672,8 +2516,6 @@ mod test { // Create a pod we can use to run commands in the cluster let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); // Apply a basic configuration of CoreDB println!("Creating CoreDB resource {}", name); @@ -2745,9 +2587,6 @@ mod test { let pod_name = format!("{}-{}", name, i); pod_ready_and_running(pods.clone(), pod_name).await; } - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; // Assert that we can query the database with \dx; let result = psql_with_retry(context.clone(), coredb_resource.clone(), "\\dx".to_string()).await; @@ -2783,9 +2622,6 @@ mod test { let pod_name = format!("{}-{}", name, i); pod_ready_and_running(pods.clone(), pod_name).await; } - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; for pod in &retrieved_pods { let cmd = vec![ @@ -3904,15 +3740,6 @@ CREATE EVENT TRIGGER pgrst_watch pod_ready_and_running(pods.clone(), pod_name.clone()).await; - let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; - // Assert status contains configs let mut found_configs = false; let expected_config = ConfigValue::Multiple(BTreeSet::from_iter(vec![ @@ -4079,12 +3906,6 @@ CREATE EVENT TRIGGER pgrst_watch pod_ready_and_running(pods.clone(), pod_name.clone()).await; let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - // Wait for CNPG Cluster to be created by looping over replicas until // they are in a running state for i in 1..=replicas { @@ -4268,11 +4089,6 @@ CREATE EVENT TRIGGER pgrst_watch pod_ready_and_running(restore_pods.clone(), restore_pod_name.clone()).await; let restore_pods: Api = Api::namespaced(client.clone(), &restore_namespace); - let lp = ListParams::default() - .labels(format!("app=postgres-exporter,coredb.io/name={}", restore_name).as_str()); - let exporter_pods = restore_pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); // Wait for CNPG Cluster to be created by looping over replicas until // they are in a running state @@ -4433,15 +4249,6 @@ CREATE EVENT TRIGGER pgrst_watch pod_ready_and_running(pods.clone(), pod_name.clone()).await; - let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; - // Check for pooler let pooler_name = format!("{}-pooler", name); let poolers: Api = Api::namespaced(client.clone(), &namespace);