From 5016af1a6c499385e99f4bbfe829c708a3f6b8ba Mon Sep 17 00:00:00 2001 From: Nick Hudson Date: Sat, 16 Dec 2023 14:39:10 -0600 Subject: [PATCH] Consolidate stack metrics into metrics sidecar instead of postgres exporter deployment (#425) --- charts/tembo-operator/Chart.yaml | 2 +- charts/tembo-operator/templates/crd.yaml | 14 +- conductor/Cargo.lock | 48 +- conductor/tests/integration_tests.rs | 38 +- tembo-operator/Cargo.lock | 2 +- tembo-operator/Cargo.toml | 2 +- tembo-operator/src/apis/coredb_types.rs | 10 +- tembo-operator/src/cloudnativepg/cnpg.rs | 15 + tembo-operator/src/configmap.rs | 3 + tembo-operator/src/controller.rs | 51 +- tembo-operator/src/defaults.rs | 2 +- .../src/deployment_postgres_exporter.rs | 478 ++++++++++-------- tembo-operator/src/postgres_exporter.rs | 51 +- tembo-operator/src/rbac.rs | 462 ++++++++++------- tembo-operator/src/service.rs | 138 +++-- tembo-operator/testdata/prometheus-stack.yaml | 35 ++ tembo-operator/tests/integration_tests.rs | 321 +++--------- tembo-operator/yaml/sample-message-queue.yaml | 4 +- 18 files changed, 877 insertions(+), 799 deletions(-) diff --git a/charts/tembo-operator/Chart.yaml b/charts/tembo-operator/Chart.yaml index 5e69ea1ff..218976fcf 100644 --- a/charts/tembo-operator/Chart.yaml +++ b/charts/tembo-operator/Chart.yaml @@ -3,7 +3,7 @@ name: tembo-operator description: 'Helm chart to deploy the tembo-operator' type: application icon: https://cloud.tembo.io/images/TemboElephant.png -version: 0.2.2 +version: 0.2.3 home: https://tembo.io sources: - https://github.com/tembo-io/tembo-stacks diff --git a/charts/tembo-operator/templates/crd.yaml b/charts/tembo-operator/templates/crd.yaml index e1a1dde6c..f105a0ab1 100644 --- a/charts/tembo-operator/templates/crd.yaml +++ b/charts/tembo-operator/templates/crd.yaml @@ -1614,7 +1614,7 @@ spec: nullable: true properties: enabled: - default: true + default: false description: |- To enable or disable the metric. @@ -1675,18 +1675,12 @@ spec: format: int32 type: integer postgresExporterEnabled: - default: true - description: |- - Enable the use of the Postgres Exporter deployment for metrics collection - - **Default**: true. + default: false + description: '**DEPRECATED** Enable the use of the Postgres Exporter deployment for metrics collection This is no longer used and will be removed in a future release.' type: boolean postgresExporterImage: default: quay.io/prometheuscommunity/postgres-exporter:v0.12.0 - description: |- - The postgres-exporter image you want to use for the postgres-exporter deployment. - - **Default**: quay.io/prometheuscommunity/postgres-exporter:v0.12.0 + description: '**DEPRECATED** The postgres-exporter image you want to use for the postgres-exporter deployment. This is no longer used and will be removed in a future release.' type: string replicas: default: 1 diff --git a/conductor/Cargo.lock b/conductor/Cargo.lock index 3ca687528..153d44c97 100644 --- a/conductor/Cargo.lock +++ b/conductor/Cargo.lock @@ -334,7 +334,7 @@ dependencies = [ "hex", "http", "hyper", - "ring", + "ring 0.16.20", "time", "tokio", "tower", @@ -826,7 +826,7 @@ dependencies = [ [[package]] name = "controller" -version = "0.23.0" +version = "0.28.0" dependencies = [ "actix-web", "anyhow", @@ -2701,12 +2701,26 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", - "untrusted", + "spin 0.5.2", + "untrusted 0.7.1", "web-sys", "winapi", ] +[[package]] +name = "ring" +version = "0.17.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin 0.9.8", + "untrusted 0.9.0", + "windows-sys", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -2742,7 +2756,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" dependencies = [ "log", - "ring", + "ring 0.16.20", "sct", "webpki", ] @@ -2820,8 +2834,8 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -3022,6 +3036,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "sqlformat" version = "0.2.2" @@ -3649,6 +3669,12 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.4.1" @@ -3813,12 +3839,12 @@ dependencies = [ [[package]] name = "webpki" -version = "0.22.1" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0e74f82d49d545ad128049b7e88f6576df2da6b02e9ce565c6f533be576957e" +checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" dependencies = [ - "ring", - "untrusted", + "ring 0.17.7", + "untrusted 0.9.0", ] [[package]] diff --git a/conductor/tests/integration_tests.rs b/conductor/tests/integration_tests.rs index dafa94255..a6714a68d 100644 --- a/conductor/tests/integration_tests.rs +++ b/conductor/tests/integration_tests.rs @@ -17,7 +17,6 @@ mod test { }; use kube::{ - api::ListParams, runtime::wait::{await_condition, conditions}, Api, Client, Config, }; @@ -232,13 +231,6 @@ mod test { let pod_name = format!("{}-1", &namespace); pod_ready_and_running(pods.clone(), pod_name.clone()).await; - // Wait for postgres-exporter pod to be running - let lp = ListParams::default() - .labels(format!("app=postgres-exporter,coredb.io/name={}", namespace).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; - // ADD AN EXTENSION - ASSERT IT MAKES IT TO STATUS.EXTENSIONS // conductor receives a CRUDevent from control plane // take note of number of extensions at this point in time @@ -297,9 +289,6 @@ mod test { thread::sleep(time::Duration::from_secs(40)); pod_ready_and_running(pods.clone(), pod_name.clone()).await; - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; // Get the last time the pod was started // using SELECT pg_postmaster_start_time(); @@ -348,9 +337,6 @@ mod test { } pod_ready_and_running(pods.clone(), pod_name).await; - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; // Get the last time the pod was started // using SELECT pg_postmaster_start_time(); @@ -407,10 +393,28 @@ mod test { use conductor::aws::cloudformation::AWSConfigState; let aws_region = "us-east-1".to_owned(); let region = Region::new(aws_region); - let aws_config_state = AWSConfigState::new(region).await; + let aws_config_state = AWSConfigState::new(region.clone()).await; let stack_name = format!("org-{}-inst-{}-cf", org_name, dbname); - let exists = aws_config_state.does_stack_exist(&stack_name).await; - assert!(!exists, "CF stack was not deleted"); + // let dcf = aws_config_state + // .delete_cloudformation_stack(&stack_name) + // .await; + // assert!(dcf); + // let exists = aws_config_state.does_stack_exist(&stack_name).await; + // assert!(!exists, "CF stack was not deleted"); + match aws_config_state + .delete_cloudformation_stack(&stack_name) + .await + { + Ok(_) => { + // If deletion was successful, check if the stack still exists + let stack_exists = aws_config_state.does_stack_exist(&stack_name).await; + assert!(!stack_exists, "CloudFormation stack was not deleted"); + } + Err(e) => { + // If there was an error deleting the stack, fail the test + panic!("Failed to delete CloudFormation stack: {:?}", e); + } + } } async fn kube_client() -> kube::Client { diff --git a/tembo-operator/Cargo.lock b/tembo-operator/Cargo.lock index a4de24304..3ce23d2c5 100644 --- a/tembo-operator/Cargo.lock +++ b/tembo-operator/Cargo.lock @@ -494,7 +494,7 @@ dependencies = [ [[package]] name = "controller" -version = "0.27.1" +version = "0.28.0" dependencies = [ "actix-web", "anyhow", diff --git a/tembo-operator/Cargo.toml b/tembo-operator/Cargo.toml index ccf151bfb..708a5b637 100644 --- a/tembo-operator/Cargo.toml +++ b/tembo-operator/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "controller" description = "Tembo Operator for Postgres" -version = "0.27.1" +version = "0.28.0" edition = "2021" default-run = "controller" license = "Apache-2.0" diff --git a/tembo-operator/src/apis/coredb_types.rs b/tembo-operator/src/apis/coredb_types.rs index 16f073c6d..77bd5d006 100644 --- a/tembo-operator/src/apis/coredb_types.rs +++ b/tembo-operator/src/apis/coredb_types.rs @@ -364,9 +364,8 @@ pub struct CoreDBSpec { #[serde(default = "defaults::default_pkglibdir_storage")] pub pkglibdirStorage: Quantity, - /// Enable the use of the Postgres Exporter deployment for metrics collection - /// - /// **Default**: true. + /// **DEPRECATED** Enable the use of the Postgres Exporter deployment for metrics collection + /// This is no longer used and will be removed in a future release. #[serde(default = "defaults::default_postgres_exporter_enabled")] pub postgresExporterEnabled: bool, @@ -379,9 +378,8 @@ pub struct CoreDBSpec { #[serde(default = "defaults::default_image")] pub image: String, - /// The postgres-exporter image you want to use for the postgres-exporter deployment. - /// - /// **Default**: quay.io/prometheuscommunity/postgres-exporter:v0.12.0 + /// **DEPRECATED** The postgres-exporter image you want to use for the postgres-exporter deployment. + /// This is no longer used and will be removed in a future release. #[serde(default = "defaults::default_postgres_exporter_image")] pub postgresExporterImage: String, diff --git a/tembo-operator/src/cloudnativepg/cnpg.rs b/tembo-operator/src/cloudnativepg/cnpg.rs index 1e964caf2..aaec71784 100644 --- a/tembo-operator/src/cloudnativepg/cnpg.rs +++ b/tembo-operator/src/cloudnativepg/cnpg.rs @@ -44,6 +44,7 @@ use crate::{ defaults::{default_image, default_llm_image}, errors::ValueError, is_postgres_ready, patch_cdb_status_merge, + postgres_exporter::EXPORTER_CONFIGMAP_PREFIX, psql::PsqlOutput, trunk::extensions_that_require_load, Context, RESTARTED_AT, @@ -601,6 +602,20 @@ pub fn cnpg_cluster_from_cdb( }) } + if cdb + .spec + .metrics + .as_ref() + .and_then(|m| m.queries.as_ref()) + .is_some() + { + let configmap = format!("{}{}", EXPORTER_CONFIGMAP_PREFIX, cdb.name_any()); + metrics.push(ClusterMonitoringCustomQueriesConfigMap { + key: "tembo-queries".to_string(), + name: configmap, + }) + } + Cluster { metadata: ObjectMeta { name: Some(name.clone()), diff --git a/tembo-operator/src/configmap.rs b/tembo-operator/src/configmap.rs index eb61060f6..7caece992 100644 --- a/tembo-operator/src/configmap.rs +++ b/tembo-operator/src/configmap.rs @@ -72,10 +72,13 @@ pub async fn apply_configmap( cm_name: &str, data: BTreeMap, ) -> Result<(), Error> { + let mut labels: BTreeMap = BTreeMap::new(); + labels.insert("cnpg.io/reload".to_owned(), "true".to_owned()); let cm_api: Api = Api::namespaced(client, namespace); let cm = ConfigMap { metadata: ObjectMeta { name: Some(cm_name.to_string()), + labels: Some(labels), ..Default::default() }, data: Some(data), diff --git a/tembo-operator/src/controller.rs b/tembo-operator/src/controller.rs index 446557e71..63eefd2f2 100644 --- a/tembo-operator/src/controller.rs +++ b/tembo-operator/src/controller.rs @@ -9,7 +9,6 @@ use crate::{ cnpg::{cnpg_cluster_from_cdb, reconcile_cnpg, reconcile_cnpg_scheduled_backup, reconcile_pooler}, }, config::Config, - deployment_postgres_exporter::reconcile_prometheus_exporter_deployment, exec::{ExecCommand, ExecOutput}, extensions::database_queries::is_not_restarting, heartbeat::reconcile_heartbeat, @@ -17,7 +16,6 @@ use crate::{ postgres_certificates::reconcile_certificates, psql::{PsqlCommand, PsqlOutput}, secret::{reconcile_postgres_role_secret, reconcile_secret}, - service::reconcile_prometheus_exporter_service, telemetry, Error, Metrics, Result, }; use k8s_openapi::{ @@ -43,7 +41,7 @@ use crate::{ extensions::{database_queries::list_config_params, reconcile_extensions}, ingress::{reconcile_extra_postgres_ing_route_tcp, reconcile_ip_allowlist_middleware}, network_policies::reconcile_network_policies, - postgres_exporter::reconcile_prom_configmap, + postgres_exporter::reconcile_metrics_configmap, trunk::{extensions_that_require_load, reconcile_trunk_configmap}, }; use rand::Rng; @@ -216,16 +214,15 @@ impl CoreDB { reconcile_app_services(self, ctx.clone()).await?; - if self.spec.postgresExporterEnabled - && self - .spec - .metrics - .as_ref() - .and_then(|m| m.queries.as_ref()) - .is_some() + if self + .spec + .metrics + .as_ref() + .and_then(|m| m.queries.as_ref()) + .is_some() { debug!("Reconciling prometheus configmap"); - reconcile_prom_configmap(self, client.clone(), &ns) + reconcile_metrics_configmap(self, client.clone(), &ns) .await .map_err(|e| { error!("Error reconciling prometheus configmap: {:?}", e); @@ -240,21 +237,6 @@ impl CoreDB { Action::requeue(Duration::from_secs(300)) })?; - // Postgres exporter connection info - if self.spec.postgresExporterEnabled { - let _ = reconcile_postgres_role_secret( - self, - ctx.clone(), - "postgres_exporter", - &format!("{}-exporter", name.clone()), - ) - .await - .map_err(|e| { - error!("Error reconciling postgres exporter secret: {:?}", e); - Action::requeue(Duration::from_secs(300)) - })?; - } - let _ = reconcile_postgres_role_secret(self, ctx.clone(), "readonly", &format!("{}-ro", name.clone())) .await @@ -270,22 +252,11 @@ impl CoreDB { reconcile_cnpg_scheduled_backup(self, ctx.clone()).await?; } - if self.spec.postgresExporterEnabled { - debug!("Reconciling prometheus exporter deployment"); - reconcile_prometheus_exporter_deployment(self, ctx.clone()) - .await - .map_err(|e| { - error!("Error reconciling prometheus exporter deployment: {:?}", e); - Action::requeue(Duration::from_secs(300)) - })?; - }; - - // reconcile service - debug!("Reconciling prometheus exporter service"); - reconcile_prometheus_exporter_service(self, ctx.clone()) + // Cleanup old Postgres Exporter Deployments, Service, ServiceAccount, Role and RoleBinding + crate::deployment_postgres_exporter::cleanup_postgres_exporter(self, ctx.clone()) .await .map_err(|e| { - error!("Error reconciling service: {:?}", e); + error!("Error reconciling prometheus exporter deployment: {:?}", e); Action::requeue(Duration::from_secs(300)) })?; diff --git a/tembo-operator/src/defaults.rs b/tembo-operator/src/defaults.rs index 0369439b6..e6565fc01 100644 --- a/tembo-operator/src/defaults.rs +++ b/tembo-operator/src/defaults.rs @@ -30,7 +30,7 @@ pub fn default_resources() -> ResourceRequirements { } pub fn default_postgres_exporter_enabled() -> bool { - true + false } pub fn default_uid() -> i32 { diff --git a/tembo-operator/src/deployment_postgres_exporter.rs b/tembo-operator/src/deployment_postgres_exporter.rs index 86bdd3865..1bdfd9af5 100644 --- a/tembo-operator/src/deployment_postgres_exporter.rs +++ b/tembo-operator/src/deployment_postgres_exporter.rs @@ -1,237 +1,267 @@ -use crate::{ - apis::coredb_types::CoreDB, - defaults::default_postgres_exporter_image, - postgres_exporter::{EXPORTER_CONFIGMAP_PREFIX, EXPORTER_VOLUME, QUERIES_YAML}, - rbac::reconcile_rbac, - Context, Error, Result, -}; -use k8s_openapi::{ - api::{ - apps::v1::{Deployment, DeploymentSpec}, - core::v1::{ - ConfigMapVolumeSource, Container, ContainerPort, EnvVar, EnvVarSource, HTTPGetAction, PodSpec, - PodTemplateSpec, Probe, SecretKeySelector, SecurityContext, Volume, VolumeMount, - }, - rbac::v1::PolicyRule, - }, - apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, -}; -use kube::{ - api::{Api, ObjectMeta, Patch, PatchParams, ResourceExt}, - Resource, -}; -use std::{collections::BTreeMap, sync::Arc}; -use tracing::instrument; +use crate::{apis::coredb_types::CoreDB, Context, Error, Result}; +use k8s_openapi::api::apps::v1::Deployment; +use kube::api::{Api, ListParams, ResourceExt}; +use std::sync::Arc; +use tracing::{debug, error}; -const PROM_CFG_DIR: &str = "/prometheus"; +// const PROM_CFG_DIR: &str = "/prometheus"; -#[instrument(skip(cdb, ctx), fields(instance_name = %cdb.name_any()))] -pub async fn reconcile_prometheus_exporter_deployment(cdb: &CoreDB, ctx: Arc) -> Result<(), Error> { +// Top level function to cleanup all postgres-exporter resources +// this includes the deployment, service and rbac +pub async fn cleanup_postgres_exporter(cdb: &CoreDB, ctx: Arc) -> Result<(), Error> { + delete_postgres_exporter_deployment(cdb, ctx.clone()).await?; + crate::service::delete_postgres_exporter_service(cdb, ctx.clone()).await?; + crate::rbac::cleanup_postgres_exporter_rbac(cdb, ctx.clone()).await?; + Ok(()) +} + +// Delete the postgres-exporter Deployment from the cluster +async fn delete_postgres_exporter_deployment(cdb: &CoreDB, ctx: Arc) -> Result<(), Error> { let client = ctx.client.clone(); - let coredb_name = cdb.metadata.name.clone().expect("should always have a name"); let ns = cdb.namespace().unwrap(); - let name = format!("{}-metrics", cdb.name_any()); - let mut labels: BTreeMap = BTreeMap::new(); let deployment_api: Api = Api::namespaced(client, &ns); - let oref = cdb.controller_owner_ref(&()).unwrap(); - labels.insert("app".to_owned(), "postgres-exporter".to_string()); - labels.insert("component".to_owned(), "metrics".to_string()); - labels.insert("coredb.io/name".to_owned(), cdb.name_any()); - - // Format the postgres-exporter connection URI - // Check if cnpg is enabled, if so then set the URI to the cnpg service - // Otherwise, use the old coredb service - let psql_uri: String = format!("{}-rw.{}.svc.cluster.local:5432/postgres", cdb.name_any(), ns); - - // reconcile rbac(service account, role, role binding) for the postgres-exporter - let rbac = reconcile_rbac( - cdb, - ctx.clone(), - Some("metrics"), - create_policy_rules(name.clone()).await, - ) - .await?; - - // Generate the ObjectMeta for the Deployment - let deployment_metadata = ObjectMeta { - name: Some(name.to_owned()), - namespace: Some(ns.to_owned()), - labels: Some(labels.clone()), - owner_references: Some(vec![oref]), - ..ObjectMeta::default() - }; - // 0 replicas on deployment when stopping - // 1 replica in all other cases - let replicas = match cdb.spec.stop { - true => 0, - false => 1, - }; + // Define the label selector based on your deployment labels + let label_selector = + "app=postgres-exporter,component=metrics,coredb.io/name=".to_owned() + &cdb.name_any(); + let lp = ListParams::default().labels(&label_selector); - // Generate the Probe for the Container - let readiness_probe = Probe { - http_get: Some(HTTPGetAction { - path: Some("/metrics".to_string()), - port: IntOrString::String("metrics".to_string()), - ..HTTPGetAction::default() - }), - initial_delay_seconds: Some(3), - ..Probe::default() - }; - - // Generate ContainerPort for the Container - let container_port = vec![ContainerPort { - container_port: 9187, - name: Some("metrics".to_string()), - protocol: Some("TCP".to_string()), - ..ContainerPort::default() - }]; - - // Generate SecurityContext for the Container - let security_context = SecurityContext { - run_as_user: Some(65534), - allow_privilege_escalation: Some(false), - ..SecurityContext::default() - }; - - // Generate EnvVar for the Container - let env_vars = vec![ - EnvVar { - name: "DATA_SOURCE_URI".to_string(), - value: Some(psql_uri.clone()), - ..EnvVar::default() - }, - EnvVar { - name: "DATA_SOURCE_USER".to_string(), - value: Some("postgres_exporter".to_string()), - ..EnvVar::default() - }, - // Set EnvVar from a secret - EnvVar { - name: "DATA_SOURCE_PASS".to_string(), - value_from: Some(EnvVarSource { - secret_key_ref: Some(SecretKeySelector { - key: "password".to_string(), - name: Some(format!("{}-exporter", coredb_name.clone())), - optional: Some(false), - }), - ..EnvVarSource::default() - }), - ..EnvVar::default() - }, - EnvVar { - name: "PG_EXPORTER_EXTEND_QUERY_PATH".to_string(), - value: Some(format!("{PROM_CFG_DIR}/{QUERIES_YAML}")), - ..EnvVar::default() - }, - ]; - - // Generate VolumeMounts for the Container - let exporter_vol_mounts = if let Some(metrics) = &cdb.spec.metrics { - if metrics.queries.is_some() { - vec![VolumeMount { - name: EXPORTER_VOLUME.to_owned(), - mount_path: PROM_CFG_DIR.to_string(), - ..VolumeMount::default() - }] - } else { - vec![] - } - } else { - vec![] - }; + // List deployments with specified labels + let deployments = deployment_api.list(&lp).await?; - // Generate Volumes for the PodSpec - let exporter_volumes = if let Some(metrics) = &cdb.spec.metrics { - if metrics.queries.is_some() { - vec![Volume { - config_map: Some(ConfigMapVolumeSource { - name: Some(format!("{}{}", EXPORTER_CONFIGMAP_PREFIX.to_owned(), coredb_name)), - ..ConfigMapVolumeSource::default() - }), - name: EXPORTER_VOLUME.to_owned(), - ..Volume::default() - }] - } else { - vec![] + // Delete the deployment + for deployment in deployments { + if let Some(deployment_name) = deployment.metadata.name { + match deployment_api.delete(&deployment_name, &Default::default()).await { + Ok(_) => { + debug!( + "Deleted Deployment: {}, for instance {}", + deployment_name, + cdb.name_any() + ); + } + Err(e) => { + error!( + "Error deleting Deployment: {}, for instance {}", + e, + cdb.name_any() + ); + return Err(Error::KubeError(e)); + } + } } - } else { - vec![] - }; - - // Generate the PodSpec for the PodTemplateSpec - let pod_spec = PodSpec { - containers: vec![Container { - env: Some(env_vars), - image: Some(get_exporter_image(&cdb.clone())), - name: "postgres-exporter".to_string(), - ports: Some(container_port), - readiness_probe: Some(readiness_probe), - security_context: Some(security_context), - volume_mounts: Some(exporter_vol_mounts), - ..Container::default() - }], - service_account: rbac.service_account.metadata.name.clone(), - service_account_name: rbac.service_account.metadata.name.clone(), - volumes: Some(exporter_volumes), - ..PodSpec::default() - }; - - // Generate the PodTemplateSpec for the DeploymentSpec - let pod_template_spec = PodTemplateSpec { - metadata: Some(deployment_metadata.clone()), - spec: Some(pod_spec), - }; - - // Generate the DeploymentSpec for the Deployment - let deployment_spec = DeploymentSpec { - replicas: Some(replicas), - selector: LabelSelector { - match_labels: Some(labels.clone()), - ..LabelSelector::default() - }, - template: pod_template_spec, - ..DeploymentSpec::default() - }; - - // Generate the Deployment for Prometheus Exporter - let deployment = Deployment { - metadata: deployment_metadata, - spec: Some(deployment_spec), - ..Deployment::default() - }; - - let ps = PatchParams::apply("cntrlr").force(); - let _o = deployment_api - .patch(&name, &ps, &Patch::Apply(&deployment)) - .await - .map_err(Error::KubeError)?; + } Ok(()) } -// Generate the PolicyRules for the Role -#[instrument(fields(instance_name = %name))] -async fn create_policy_rules(name: String) -> Vec { - vec![ - // This policy allows get, watch access to a secret in the namespace - PolicyRule { - api_groups: Some(vec!["".to_owned()]), - resource_names: Some(vec![format!("{}", name)]), - resources: Some(vec!["secrets".to_owned()]), - verbs: vec!["get".to_string(), "watch".to_string()], - ..PolicyRule::default() - }, - ] -} - -fn get_exporter_image(cdb: &CoreDB) -> String { - // Check if cdb.spec.postgresExporterImage is set - // If so, use that image; otherwise, use the default - // image from default_postgres_exporter_image() function - if cdb.spec.postgresExporterImage.is_empty() { - default_postgres_exporter_image() - } else { - cdb.spec.postgresExporterImage.clone() - } -} +// #[instrument(skip(cdb, ctx), fields(instance_name = %cdb.name_any()))] +// pub async fn reconcile_prometheus_exporter_deployment(cdb: &CoreDB, ctx: Arc) -> Result<(), Error> { +// let client = ctx.client.clone(); +// let coredb_name = cdb.metadata.name.clone().expect("should always have a name"); +// let ns = cdb.namespace().unwrap(); +// let name = format!("{}-metrics", cdb.name_any()); +// let mut labels: BTreeMap = BTreeMap::new(); +// let deployment_api: Api = Api::namespaced(client, &ns); +// let oref = cdb.controller_owner_ref(&()).unwrap(); +// labels.insert("app".to_owned(), "postgres-exporter".to_string()); +// labels.insert("component".to_owned(), "metrics".to_string()); +// labels.insert("coredb.io/name".to_owned(), cdb.name_any()); +// +// // Format the postgres-exporter connection URI +// // Check if cnpg is enabled, if so then set the URI to the cnpg service +// // Otherwise, use the old coredb service +// let psql_uri: String = format!("{}-rw.{}.svc.cluster.local:5432/postgres", cdb.name_any(), ns); +// +// // reconcile rbac(service account, role, role binding) for the postgres-exporter +// let rbac = reconcile_rbac( +// cdb, +// ctx.clone(), +// Some("metrics"), +// create_policy_rules(name.clone()).await, +// ) +// .await?; +// +// // Generate the ObjectMeta for the Deployment +// let deployment_metadata = ObjectMeta { +// name: Some(name.to_owned()), +// namespace: Some(ns.to_owned()), +// labels: Some(labels.clone()), +// owner_references: Some(vec![oref]), +// ..ObjectMeta::default() +// }; +// +// // 0 replicas on deployment when stopping +// // 1 replica in all other cases +// let replicas = match cdb.spec.stop { +// true => 0, +// false => 1, +// }; +// +// // Generate the Probe for the Container +// let readiness_probe = Probe { +// http_get: Some(HTTPGetAction { +// path: Some("/metrics".to_string()), +// port: IntOrString::String("metrics".to_string()), +// ..HTTPGetAction::default() +// }), +// initial_delay_seconds: Some(3), +// ..Probe::default() +// }; +// +// // Generate ContainerPort for the Container +// let container_port = vec![ContainerPort { +// container_port: 9187, +// name: Some("metrics".to_string()), +// protocol: Some("TCP".to_string()), +// ..ContainerPort::default() +// }]; +// +// // Generate SecurityContext for the Container +// let security_context = SecurityContext { +// run_as_user: Some(65534), +// allow_privilege_escalation: Some(false), +// ..SecurityContext::default() +// }; +// +// // Generate EnvVar for the Container +// let env_vars = vec![ +// EnvVar { +// name: "DATA_SOURCE_URI".to_string(), +// value: Some(psql_uri.clone()), +// ..EnvVar::default() +// }, +// EnvVar { +// name: "DATA_SOURCE_USER".to_string(), +// value: Some("postgres_exporter".to_string()), +// ..EnvVar::default() +// }, +// // Set EnvVar from a secret +// EnvVar { +// name: "DATA_SOURCE_PASS".to_string(), +// value_from: Some(EnvVarSource { +// secret_key_ref: Some(SecretKeySelector { +// key: "password".to_string(), +// name: Some(format!("{}-exporter", coredb_name.clone())), +// optional: Some(false), +// }), +// ..EnvVarSource::default() +// }), +// ..EnvVar::default() +// }, +// EnvVar { +// name: "PG_EXPORTER_EXTEND_QUERY_PATH".to_string(), +// value: Some(format!("{PROM_CFG_DIR}/{QUERIES_YAML}")), +// ..EnvVar::default() +// }, +// ]; +// +// // Generate VolumeMounts for the Container +// let exporter_vol_mounts = if let Some(metrics) = &cdb.spec.metrics { +// if metrics.queries.is_some() { +// vec![VolumeMount { +// name: EXPORTER_VOLUME.to_owned(), +// mount_path: PROM_CFG_DIR.to_string(), +// ..VolumeMount::default() +// }] +// } else { +// vec![] +// } +// } else { +// vec![] +// }; +// +// // Generate Volumes for the PodSpec +// let exporter_volumes = if let Some(metrics) = &cdb.spec.metrics { +// if metrics.queries.is_some() { +// vec![Volume { +// config_map: Some(ConfigMapVolumeSource { +// name: Some(format!("{}{}", EXPORTER_CONFIGMAP_PREFIX.to_owned(), coredb_name)), +// ..ConfigMapVolumeSource::default() +// }), +// name: EXPORTER_VOLUME.to_owned(), +// ..Volume::default() +// }] +// } else { +// vec![] +// } +// } else { +// vec![] +// }; +// +// // Generate the PodSpec for the PodTemplateSpec +// let pod_spec = PodSpec { +// containers: vec![Container { +// env: Some(env_vars), +// image: Some(get_exporter_image(&cdb.clone())), +// name: "postgres-exporter".to_string(), +// ports: Some(container_port), +// readiness_probe: Some(readiness_probe), +// security_context: Some(security_context), +// volume_mounts: Some(exporter_vol_mounts), +// ..Container::default() +// }], +// service_account: rbac.service_account.metadata.name.clone(), +// service_account_name: rbac.service_account.metadata.name.clone(), +// volumes: Some(exporter_volumes), +// ..PodSpec::default() +// }; +// +// // Generate the PodTemplateSpec for the DeploymentSpec +// let pod_template_spec = PodTemplateSpec { +// metadata: Some(deployment_metadata.clone()), +// spec: Some(pod_spec), +// }; +// +// // Generate the DeploymentSpec for the Deployment +// let deployment_spec = DeploymentSpec { +// replicas: Some(replicas), +// selector: LabelSelector { +// match_labels: Some(labels.clone()), +// ..LabelSelector::default() +// }, +// template: pod_template_spec, +// ..DeploymentSpec::default() +// }; +// +// // Generate the Deployment for Prometheus Exporter +// let deployment = Deployment { +// metadata: deployment_metadata, +// spec: Some(deployment_spec), +// ..Deployment::default() +// }; +// +// let ps = PatchParams::apply("cntrlr").force(); +// let _o = deployment_api +// .patch(&name, &ps, &Patch::Apply(&deployment)) +// .await +// .map_err(Error::KubeError)?; +// +// Ok(()) +// } +// +// // Generate the PolicyRules for the Role +// #[instrument(fields(instance_name = %name))] +// async fn create_policy_rules(name: String) -> Vec { +// vec![ +// // This policy allows get, watch access to a secret in the namespace +// PolicyRule { +// api_groups: Some(vec!["".to_owned()]), +// resource_names: Some(vec![format!("{}", name)]), +// resources: Some(vec!["secrets".to_owned()]), +// verbs: vec!["get".to_string(), "watch".to_string()], +// ..PolicyRule::default() +// }, +// ] +// } +// +// fn get_exporter_image(cdb: &CoreDB) -> String { +// // Check if cdb.spec.postgresExporterImage is set +// // If so, use that image; otherwise, use the default +// // image from default_postgres_exporter_image() function +// if cdb.spec.postgresExporterImage.is_empty() { +// default_postgres_exporter_image() +// } else { +// cdb.spec.postgresExporterImage.clone() +// } +// } diff --git a/tembo-operator/src/postgres_exporter.rs b/tembo-operator/src/postgres_exporter.rs index ad4af1d7b..97c4be649 100644 --- a/tembo-operator/src/postgres_exporter.rs +++ b/tembo-operator/src/postgres_exporter.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use tracing::debug; -pub const QUERIES_YAML: &str = "queries.yaml"; +pub const QUERIES: &str = "tembo-queries"; pub const EXPORTER_VOLUME: &str = "postgres-exporter"; pub const EXPORTER_CONFIGMAP_PREFIX: &str = "metrics-"; @@ -81,10 +81,55 @@ pub struct Metrics { pub metrics: BTreeMap, } +/// **Example**: This example exposes specific metrics from a query to a +/// [pgmq](https://github.com/tembo-io/pgmq) queue enabled database. +/// +/// ```yaml +/// metrics: +/// enabled: true +/// image: quay.io/prometheuscommunity/postgres-exporter:v0.12.0 +/// queries: +/// pgmq: +/// query: select queue_name, queue_length, oldest_msg_age_sec, newest_msg_age_sec, total_messages from pgmq.metrics_all() +/// primary: true +/// metrics: +/// - queue_name: +/// description: Name of the queue +/// usage: LABEL +/// - queue_length: +/// description: Number of messages in the queue +/// usage: GAUGE +/// - oldest_msg_age_sec: +/// description: Age of the oldest message in the queue, in seconds. +/// usage: GAUGE +/// - newest_msg_age_sec: +/// description: Age of the newest message in the queue, in seconds. +/// usage: GAUGE +/// - total_messages: +/// description: Total number of messages that have passed into the queue. +/// usage: GAUGE +/// ``` #[derive(Clone, Debug, JsonSchema, PartialEq, Serialize, Deserialize)] pub struct QueryItem { + /// the SQL query to run on the target database to generate the metrics pub query: String, + + // We need to support this at some point going forward since master + // is now deprecated. + // whether to run the query only on the primary instance + //pub primary: Option, + + // same as primary (for compatibility with the Prometheus PostgreSQL + // exporter's syntax - **deprecated**) + /// whether to run the query only on the master instance + /// See [https://cloudnative-pg.io/documentation/1.20/monitoring/#structure-of-a-user-defined-metric](https://cloudnative-pg.io/documentation/1.20/monitoring/#structure-of-a-user-defined-metric) pub master: bool, + + /// the name of the column returned by the query + /// + /// usage: one of the values described below + /// description: the metric's description + /// metrics_mapping: the optional column mapping when usage is set to MAPPEDMETRIC pub metrics: Vec, } @@ -129,7 +174,7 @@ impl FromStr for Usage { } } -pub async fn reconcile_prom_configmap(cdb: &CoreDB, client: Client, ns: &str) -> Result<(), Error> { +pub async fn reconcile_metrics_configmap(cdb: &CoreDB, client: Client, ns: &str) -> Result<(), Error> { // set custom pg-prom metrics in configmap values if they are specified let coredb_name = cdb .metadata @@ -141,7 +186,7 @@ pub async fn reconcile_prom_configmap(cdb: &CoreDB, client: Client, ns: &str) -> match cdb.spec.metrics.clone().and_then(|m| m.queries) { Some(queries) => { let qdata = serde_yaml::to_string(&queries).unwrap(); - let d: BTreeMap = BTreeMap::from([(QUERIES_YAML.to_string(), qdata)]); + let d: BTreeMap = BTreeMap::from([(QUERIES.to_string(), qdata)]); apply_configmap( client.clone(), ns, diff --git a/tembo-operator/src/rbac.rs b/tembo-operator/src/rbac.rs index 36a10c535..2e17192a5 100644 --- a/tembo-operator/src/rbac.rs +++ b/tembo-operator/src/rbac.rs @@ -1,192 +1,310 @@ use crate::{apis::coredb_types::CoreDB, Context, Error}; -use k8s_openapi::{ - api::{ - core::v1::ServiceAccount, - rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject}, - }, - apimachinery::pkg::apis::meta::v1::ObjectMeta, +use k8s_openapi::api::{ + core::v1::ServiceAccount, + rbac::v1::{Role, RoleBinding}, }; -use kube::{ - api::{Patch, PatchParams}, - Api, ResourceExt, -}; -use std::{collections::BTreeMap, sync::Arc, vec}; +use kube::{api::ListParams, Api, ResourceExt}; +use std::sync::Arc; +use tracing::{debug, error}; -pub struct Rbac { - pub service_account: ServiceAccount, - pub role: Role, - pub rolebinding: RoleBinding, -} +// pub struct Rbac { +// pub service_account: ServiceAccount, +// pub role: Role, +// pub rolebinding: RoleBinding, +// } -// reconcile kubernetes rbac resources -pub async fn reconcile_rbac( - cdb: &CoreDB, - ctx: Arc, - suffix: Option<&str>, - policy_rules: Vec, -) -> Result { - // reconcile service account - let service_account = reconcile_service_account(cdb, ctx.clone(), suffix).await?; - let sa = service_account.clone(); - // reconcile role - let role = reconcile_role(cdb, ctx.clone(), suffix, policy_rules).await?; - let rle = role.clone(); - // reconcile role binding - let role_binding = reconcile_role_binding(cdb, ctx.clone(), service_account, rle.clone(), suffix).await?; - - Ok(Rbac { - service_account: sa, - role: rle, - rolebinding: role_binding, - }) +// Delete the postgres-exporter RBAC objects from the cluster +pub async fn cleanup_postgres_exporter_rbac(cdb: &CoreDB, ctx: Arc) -> Result<(), Error> { + delete_postgres_exporter_service_account(cdb, ctx.clone()).await?; + delete_postgres_exporter_role(cdb, ctx.clone()).await?; + delete_postgres_exporter_role_binding(cdb, ctx.clone()).await?; + Ok(()) } -// reconcile a kubernetes service account -async fn reconcile_service_account( - cdb: &CoreDB, - ctx: Arc, - suffix: Option<&str>, -) -> Result { - let suffix = suffix.map_or("sa".to_owned(), |s| { - if s.is_empty() { - "sa".to_owned() - } else { - s.to_owned() - } - }); +// Delete the postgres-exporter ServiceAccount from the cluster +async fn delete_postgres_exporter_service_account(cdb: &CoreDB, ctx: Arc) -> Result<(), Error> { let client = ctx.client.clone(); let ns = cdb.namespace().unwrap(); - let name = format!("{}-{}", cdb.name_any(), suffix); - let sa_api: Api = Api::namespaced(client.clone(), &ns); - - let mut labels: BTreeMap = BTreeMap::new(); - labels.insert("app".to_owned(), "coredb".to_string()); - labels.insert("coredb.io/name".to_owned(), cdb.name_any()); - - let mut sa_metadata = ObjectMeta { - name: Some(name.to_owned()), - namespace: Some(ns.to_owned()), - labels: Some(labels.clone()), - ..ObjectMeta::default() - }; - - if let Some(ref template_metadata) = cdb.spec.serviceAccountTemplate.metadata { - if let Some(ref annotations) = template_metadata.annotations { - sa_metadata.annotations = Some(annotations.clone()); - } - } + let service_account_api: Api = Api::namespaced(client, &ns); + + // Define the label selector based on your service account labels + let label_selector = "app=coredb,coredb.io/name=".to_owned() + &cdb.name_any(); + let lp = ListParams::default().labels(&label_selector); - let sa = ServiceAccount { - metadata: sa_metadata, - ..ServiceAccount::default() - }; + // List service accounts with specified labels + let service_accounts = service_account_api.list(&lp).await?; - let ps = PatchParams::apply("cntrlr").force(); - let _o = sa_api - .patch(&name, &ps, &Patch::Apply(&sa)) - .await - .map_err(Error::KubeError)?; + // Delete the service account + for service_account in service_accounts { + if let Some(service_account_name) = service_account.metadata.name { + match service_account_api + .delete(&service_account_name, &Default::default()) + .await + { + Ok(_) => { + debug!( + "Deleted ServiceAccount: {}, for instance {}", + service_account_name, + &cdb.name_any() + ); + } + Err(e) => { + error!( + "Error deleting ServiceAccount: {}, for instance {}", + e, + &cdb.name_any() + ); + return Err(Error::KubeError(e)); + } + } + } + } - Ok(sa) + Ok(()) } -async fn reconcile_role( - cdb: &CoreDB, - ctx: Arc, - suffix: Option<&str>, - policy_rules: Vec, -) -> Result { - let suffix = suffix.map_or("role".to_owned(), |s| { - if s.is_empty() { - "role".to_owned() - } else { - s.to_owned() - } - }); +// Delete the postgres-exporter Role from the cluster +async fn delete_postgres_exporter_role(cdb: &CoreDB, ctx: Arc) -> Result<(), Error> { let client = ctx.client.clone(); let ns = cdb.namespace().unwrap(); - let name = format!("{}-{}", cdb.name_any(), suffix); - let role_api: Api = Api::namespaced(client.clone(), &ns); - - let mut labels: BTreeMap = BTreeMap::new(); - labels.insert("app".to_owned(), "coredb".to_string()); - labels.insert("coredb.io/name".to_owned(), cdb.name_any()); - - let role = Role { - metadata: ObjectMeta { - name: Some(name.to_owned()), - namespace: Some(ns.to_owned()), - labels: Some(labels.clone()), - ..ObjectMeta::default() - }, - rules: Some(policy_rules.to_vec()), - }; - - let ps = PatchParams::apply("cntrlr").force(); - let _o = role_api - .patch(&name, &ps, &Patch::Apply(&role)) - .await - .map_err(Error::KubeError)?; - - Ok(role) -} + let role_api: Api = Api::namespaced(client, &ns); + + // Define the label selector based on your role labels + let label_selector = "app=coredb,coredb.io/name=".to_owned() + &cdb.name_any(); + let lp = ListParams::default().labels(&label_selector); + + // List roles with specified labels + let roles = role_api.list(&lp).await?; -async fn reconcile_role_binding( - cdb: &CoreDB, - ctx: Arc, - sa: ServiceAccount, - role: Role, - suffix: Option<&str>, -) -> Result { - let suffix = suffix.map_or("role-binding".to_owned(), |s| { - if s.is_empty() { - "role-binding".to_owned() - } else { - s.to_owned() + // Delete the role + for role in roles { + if let Some(role_name) = role.metadata.name { + match role_api.delete(&role_name, &Default::default()).await { + Ok(_) => { + debug!("Deleted Role: {} for instance {}", role_name, &cdb.name_any()); + } + Err(e) => { + error!("Error deleting Role: {}, for instance {}", e, &cdb.name_any()); + return Err(Error::KubeError(e)); + } + } } - }); + } + + Ok(()) +} + +// Delete the postgres-exporter RoleBinding from the cluster +async fn delete_postgres_exporter_role_binding(cdb: &CoreDB, ctx: Arc) -> Result<(), Error> { let client = ctx.client.clone(); let ns = cdb.namespace().unwrap(); - let name = format!("{}-{}", cdb.name_any(), suffix); - let role_binding_api: Api = Api::namespaced(client.clone(), &ns); - let sa_name = sa.name_any(); - let role_name = role.name_any(); - - let mut labels: BTreeMap = BTreeMap::new(); - labels.insert("app".to_owned(), "coredb".to_string()); - labels.insert("coredb.io/name".to_owned(), cdb.name_any()); - - let role_ref = RoleRef { - api_group: "rbac.authorization.k8s.io".to_string(), - kind: "Role".to_string(), - name: role_name.to_string(), - }; - - let subject = Subject { - kind: "ServiceAccount".to_string(), - name: sa_name.to_string(), - namespace: Some(ns.to_owned()), - ..Subject::default() - }; - - let metadata = ObjectMeta { - name: Some(name.to_owned()), - namespace: Some(ns.to_owned()), - labels: Some(labels.clone()), - ..ObjectMeta::default() - }; - - let rb = RoleBinding { - metadata, - role_ref, - subjects: Some(vec![subject]), - }; - - let ps = PatchParams::apply("cntrlr").force(); - let _o = role_binding_api - .patch(&name, &ps, &Patch::Apply(&rb)) - .await - .map_err(Error::KubeError)?; - - Ok(rb) + let role_binding_api: Api = Api::namespaced(client, &ns); + + // Define the label selector based on your role binding labels + let label_selector = "app=coredb,coredb.io/name=".to_owned() + &cdb.name_any(); + let lp = ListParams::default().labels(&label_selector); + + // List role bindings with specified labels + let role_bindings = role_binding_api.list(&lp).await?; + + // Delete the role binding + for role_binding in role_bindings { + if let Some(role_binding_name) = role_binding.metadata.name { + match role_binding_api + .delete(&role_binding_name, &Default::default()) + .await + { + Ok(_) => { + debug!( + "Deleted RoleBinding: {}, for instance {}", + role_binding_name, + &cdb.name_any() + ); + } + Err(e) => { + error!( + "Error deleting RoleBinding: {}, for instance {}", + e, + &cdb.name_any() + ); + return Err(Error::KubeError(e)); + } + } + } + } + + Ok(()) } + +// // reconcile kubernetes rbac resources +// pub async fn reconcile_rbac( +// cdb: &CoreDB, +// ctx: Arc, +// suffix: Option<&str>, +// policy_rules: Vec, +// ) -> Result { +// // reconcile service account +// let service_account = reconcile_service_account(cdb, ctx.clone(), suffix).await?; +// let sa = service_account.clone(); +// // reconcile role +// let role = reconcile_role(cdb, ctx.clone(), suffix, policy_rules).await?; +// let rle = role.clone(); +// // reconcile role binding +// let role_binding = reconcile_role_binding(cdb, ctx.clone(), service_account, rle.clone(), suffix).await?; +// +// Ok(Rbac { +// service_account: sa, +// role: rle, +// rolebinding: role_binding, +// }) +// } +// +// // reconcile a kubernetes service account +// async fn reconcile_service_account( +// cdb: &CoreDB, +// ctx: Arc, +// suffix: Option<&str>, +// ) -> Result { +// let suffix = suffix.map_or("sa".to_owned(), |s| { +// if s.is_empty() { +// "sa".to_owned() +// } else { +// s.to_owned() +// } +// }); +// let client = ctx.client.clone(); +// let ns = cdb.namespace().unwrap(); +// let name = format!("{}-{}", cdb.name_any(), suffix); +// let sa_api: Api = Api::namespaced(client.clone(), &ns); +// +// let mut labels: BTreeMap = BTreeMap::new(); +// labels.insert("app".to_owned(), "coredb".to_string()); +// labels.insert("coredb.io/name".to_owned(), cdb.name_any()); +// +// let mut sa_metadata = ObjectMeta { +// name: Some(name.to_owned()), +// namespace: Some(ns.to_owned()), +// labels: Some(labels.clone()), +// ..ObjectMeta::default() +// }; +// +// if let Some(ref template_metadata) = cdb.spec.serviceAccountTemplate.metadata { +// if let Some(ref annotations) = template_metadata.annotations { +// sa_metadata.annotations = Some(annotations.clone()); +// } +// } +// +// let sa = ServiceAccount { +// metadata: sa_metadata, +// ..ServiceAccount::default() +// }; +// +// let ps = PatchParams::apply("cntrlr").force(); +// let _o = sa_api +// .patch(&name, &ps, &Patch::Apply(&sa)) +// .await +// .map_err(Error::KubeError)?; +// +// Ok(sa) +// } +// +// async fn reconcile_role( +// cdb: &CoreDB, +// ctx: Arc, +// suffix: Option<&str>, +// policy_rules: Vec, +// ) -> Result { +// let suffix = suffix.map_or("role".to_owned(), |s| { +// if s.is_empty() { +// "role".to_owned() +// } else { +// s.to_owned() +// } +// }); +// let client = ctx.client.clone(); +// let ns = cdb.namespace().unwrap(); +// let name = format!("{}-{}", cdb.name_any(), suffix); +// let role_api: Api = Api::namespaced(client.clone(), &ns); +// +// let mut labels: BTreeMap = BTreeMap::new(); +// labels.insert("app".to_owned(), "coredb".to_string()); +// labels.insert("coredb.io/name".to_owned(), cdb.name_any()); +// +// let role = Role { +// metadata: ObjectMeta { +// name: Some(name.to_owned()), +// namespace: Some(ns.to_owned()), +// labels: Some(labels.clone()), +// ..ObjectMeta::default() +// }, +// rules: Some(policy_rules.to_vec()), +// }; +// +// let ps = PatchParams::apply("cntrlr").force(); +// let _o = role_api +// .patch(&name, &ps, &Patch::Apply(&role)) +// .await +// .map_err(Error::KubeError)?; +// +// Ok(role) +// } +// +// async fn reconcile_role_binding( +// cdb: &CoreDB, +// ctx: Arc, +// sa: ServiceAccount, +// role: Role, +// suffix: Option<&str>, +// ) -> Result { +// let suffix = suffix.map_or("role-binding".to_owned(), |s| { +// if s.is_empty() { +// "role-binding".to_owned() +// } else { +// s.to_owned() +// } +// }); +// let client = ctx.client.clone(); +// let ns = cdb.namespace().unwrap(); +// let name = format!("{}-{}", cdb.name_any(), suffix); +// let role_binding_api: Api = Api::namespaced(client.clone(), &ns); +// let sa_name = sa.name_any(); +// let role_name = role.name_any(); +// +// let mut labels: BTreeMap = BTreeMap::new(); +// labels.insert("app".to_owned(), "coredb".to_string()); +// labels.insert("coredb.io/name".to_owned(), cdb.name_any()); +// +// let role_ref = RoleRef { +// api_group: "rbac.authorization.k8s.io".to_string(), +// kind: "Role".to_string(), +// name: role_name.to_string(), +// }; +// +// let subject = Subject { +// kind: "ServiceAccount".to_string(), +// name: sa_name.to_string(), +// namespace: Some(ns.to_owned()), +// ..Subject::default() +// }; +// +// let metadata = ObjectMeta { +// name: Some(name.to_owned()), +// namespace: Some(ns.to_owned()), +// labels: Some(labels.clone()), +// ..ObjectMeta::default() +// }; +// +// let rb = RoleBinding { +// metadata, +// role_ref, +// subjects: Some(vec![subject]), +// }; +// +// let ps = PatchParams::apply("cntrlr").force(); +// let _o = role_binding_api +// .patch(&name, &ps, &Patch::Apply(&rb)) +// .await +// .map_err(Error::KubeError)?; +// +// Ok(rb) +// } diff --git a/tembo-operator/src/service.rs b/tembo-operator/src/service.rs index ef3945adf..460209113 100644 --- a/tembo-operator/src/service.rs +++ b/tembo-operator/src/service.rs @@ -1,63 +1,95 @@ use crate::{apis::coredb_types::CoreDB, Context, Error}; -use k8s_openapi::{ - api::core::v1::{Service, ServicePort, ServiceSpec}, - apimachinery::pkg::{apis::meta::v1::ObjectMeta, util::intstr::IntOrString}, -}; -use kube::{ - api::{Patch, PatchParams}, - Api, Resource, ResourceExt, -}; -use std::{collections::BTreeMap, sync::Arc}; -use tracing::instrument; +use k8s_openapi::api::core::v1::Service; +use kube::{api::ListParams, Api, ResourceExt}; +use std::sync::Arc; +use tracing::{debug, error}; -#[instrument(skip(cdb, ctx), fields(instance_name = %cdb.name_any()))] -pub async fn reconcile_prometheus_exporter_service(cdb: &CoreDB, ctx: Arc) -> Result<(), Error> { +// Delete the postgres-exporter service from the cluster +pub async fn delete_postgres_exporter_service(cdb: &CoreDB, ctx: Arc) -> Result<(), Error> { let client = ctx.client.clone(); let ns = cdb.namespace().unwrap(); - let name = cdb.name_any() + "-metrics"; - let svc_api: Api = Api::namespaced(client, &ns); - let oref = cdb.controller_owner_ref(&()).unwrap(); + let service_api: Api = Api::namespaced(client, &ns); - if !(cdb.spec.postgresExporterEnabled) { - // check if service exists and delete it - let _o = svc_api.delete(&name, &Default::default()).await; - return Ok(()); - } - - let mut selector_labels: BTreeMap = BTreeMap::new(); - selector_labels.insert("app".to_owned(), "postgres-exporter".to_string()); - selector_labels.insert("coredb.io/name".to_owned(), cdb.name_any()); - selector_labels.insert("component".to_owned(), "metrics".to_string()); - - let mut labels = selector_labels.clone(); - labels.insert("component".to_owned(), "metrics".to_owned()); + // Define the label selector based on your service labels + let label_selector = + "app=postgres-exporter,component=metrics,coredb.io/name=".to_owned() + &cdb.name_any(); + let lp = ListParams::default().labels(&label_selector); - let metrics_svc: Service = Service { - metadata: ObjectMeta { - name: Some(name.to_owned()), - namespace: Some(ns.to_owned()), - labels: Some(labels), - owner_references: Some(vec![oref]), - ..ObjectMeta::default() - }, - spec: Some(ServiceSpec { - ports: Some(vec![ServicePort { - port: 80, - name: Some("metrics".to_string()), - target_port: Some(IntOrString::String("metrics".to_string())), - ..ServicePort::default() - }]), - selector: Some(selector_labels), - ..ServiceSpec::default() - }), - ..Service::default() - }; + // List services with specified labels + let services = service_api.list(&lp).await?; - let ps = PatchParams::apply("cntrlr").force(); - let _o = svc_api - .patch(&name, &ps, &Patch::Apply(&metrics_svc)) - .await - .map_err(Error::KubeError)?; + // Delete the service + for service in services { + if let Some(service_name) = service.metadata.name { + match service_api.delete(&service_name, &Default::default()).await { + Ok(_) => { + debug!( + "Deleted Service: {}, for instance {}", + service_name, + cdb.name_any() + ); + } + Err(e) => { + error!("Error deleting Service: {}, for instance {}", e, cdb.name_any()); + return Err(Error::KubeError(e)); + } + } + } else { + println!("Found a service without a name, skipping..."); + } + } Ok(()) } + +// #[instrument(skip(cdb, ctx), fields(instance_name = %cdb.name_any()))] +// pub async fn reconcile_prometheus_exporter_service(cdb: &CoreDB, ctx: Arc) -> Result<(), Error> { +// let client = ctx.client.clone(); +// let ns = cdb.namespace().unwrap(); +// let name = cdb.name_any() + "-metrics"; +// let svc_api: Api = Api::namespaced(client, &ns); +// let oref = cdb.controller_owner_ref(&()).unwrap(); +// +// if !(cdb.spec.postgresExporterEnabled) { +// // check if service exists and delete it +// let _o = svc_api.delete(&name, &Default::default()).await; +// return Ok(()); +// } +// +// let mut selector_labels: BTreeMap = BTreeMap::new(); +// selector_labels.insert("app".to_owned(), "postgres-exporter".to_string()); +// selector_labels.insert("coredb.io/name".to_owned(), cdb.name_any()); +// selector_labels.insert("component".to_owned(), "metrics".to_string()); +// +// let mut labels = selector_labels.clone(); +// labels.insert("component".to_owned(), "metrics".to_owned()); +// +// let metrics_svc: Service = Service { +// metadata: ObjectMeta { +// name: Some(name.to_owned()), +// namespace: Some(ns.to_owned()), +// labels: Some(labels), +// owner_references: Some(vec![oref]), +// ..ObjectMeta::default() +// }, +// spec: Some(ServiceSpec { +// ports: Some(vec![ServicePort { +// port: 80, +// name: Some("metrics".to_string()), +// target_port: Some(IntOrString::String("metrics".to_string())), +// ..ServicePort::default() +// }]), +// selector: Some(selector_labels), +// ..ServiceSpec::default() +// }), +// ..Service::default() +// }; +// +// let ps = PatchParams::apply("cntrlr").force(); +// let _o = svc_api +// .patch(&name, &ps, &Patch::Apply(&metrics_svc)) +// .await +// .map_err(Error::KubeError)?; +// +// Ok(()) +// } diff --git a/tembo-operator/testdata/prometheus-stack.yaml b/tembo-operator/testdata/prometheus-stack.yaml index d78b9bb80..a7c0da6c4 100644 --- a/tembo-operator/testdata/prometheus-stack.yaml +++ b/tembo-operator/testdata/prometheus-stack.yaml @@ -18,7 +18,37 @@ kubeProxy: enabled: false nodeExporter: enabled: false +kubeControllerManager: + enabled: false +defaultRules: + create: true + rules: + alertmanager: false + etcd: false + configReloaders: false + general: false + k8s: true + kubeApiserver: false + kubeApiserverAvailability: false + kubeApiserverSlos: false + kubelet: true + kubeProxy: false + kubePrometheusGeneral: false + kubePrometheusNodeRecording: false + kubernetesApps: false + kubernetesResources: false + kubernetesStorage: false + kubernetesSystem: false + kubeScheduler: false + kubeStateMetrics: false + network: false + node: true + nodeExporterAlerting: false + nodeExporterRecording: true + prometheus: false + prometheusOperator: false prometheusOperator: + logLevel: debug resources: limits: cpu: 100m @@ -34,3 +64,8 @@ prometheus: limits: cpu: 1 memory: 1024Mi + prometheusSpec: + podMonitorSelectorNilUsesHelmValues: false + probeSelectorNilUsesHelmValues: false + ruleSelectorNilUsesHelmValues: false + serviceMonitorSelectorNilUsesHelmValues: false diff --git a/tembo-operator/tests/integration_tests.rs b/tembo-operator/tests/integration_tests.rs index 66db4c164..1ea4079d9 100644 --- a/tembo-operator/tests/integration_tests.rs +++ b/tembo-operator/tests/integration_tests.rs @@ -30,18 +30,13 @@ mod test { use k8s_openapi::{ api::{ apps::v1::Deployment, - core::v1::{ - Container, Namespace, PersistentVolumeClaim, Pod, PodSpec, ResourceRequirements, Secret, - Service, - }, + core::v1::{Namespace, PersistentVolumeClaim, Pod, ResourceRequirements, Secret, Service}, }, apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition, - apimachinery::pkg::{api::resource::Quantity, apis::meta::v1::ObjectMeta, util::intstr::IntOrString}, + apimachinery::pkg::{api::resource::Quantity, util::intstr::IntOrString}, }; use kube::{ - api::{ - AttachParams, DeleteParams, ListParams, Patch, PatchParams, PostParams, WatchEvent, WatchParams, - }, + api::{AttachParams, DeleteParams, ListParams, Patch, PatchParams, WatchEvent, WatchParams}, runtime::wait::{await_condition, conditions, Condition}, Api, Client, Config, Error, }; @@ -62,7 +57,6 @@ mod test { // Timeout settings while waiting for an event const TIMEOUT_SECONDS_START_POD: u64 = 600; const TIMEOUT_SECONDS_POD_READY: u64 = 600; - const TIMEOUT_SECONDS_SECRET_PRESENT: u64 = 120; const TIMEOUT_SECONDS_NS_DELETED: u64 = 300; const TIMEOUT_SECONDS_POD_DELETED: u64 = 300; const TIMEOUT_SECONDS_COREDB_DELETED: u64 = 300; @@ -110,45 +104,6 @@ mod test { client } - fn wait_for_secret() -> impl Condition { - |obj: Option<&Secret>| { - if let Some(secret) = &obj { - if let Some(t) = &secret.type_ { - return t == "Opaque"; - } - } - false - } - } - - async fn create_test_buddy(pods_api: Api, name: String) -> String { - // Launch a pod we can connect to if we want to - // run commands inside the cluster. - let test_pod_name = format!("test-buddy-{}", name); - let pod = Pod { - metadata: ObjectMeta { - name: Some(test_pod_name.clone()), - ..ObjectMeta::default() - }, - spec: Some(PodSpec { - containers: vec![Container { - command: Some(vec!["sleep".to_string()]), - args: Some(vec!["1200".to_string()]), - name: "test-connection".to_string(), - image: Some("curlimages/curl:latest".to_string()), - ..Container::default() - }], - restart_policy: Some("Never".to_string()), - ..PodSpec::default() - }), - ..Pod::default() - }; - - let _pod = pods_api.create(&PostParams::default(), &pod).await.unwrap(); - - test_pod_name - } - async fn run_command_in_container( pods_api: Api, pod_name: String, @@ -582,6 +537,40 @@ mod test { false } + // Function to wait for metrics to appear + async fn wait_for_metric(pods: Api, pod_name: String, metric_name: &str) -> Result { + let max_retries = 15; // Adjust as needed + let wait_duration = Duration::from_secs(2); + + for attempt in 1..=max_retries { + let command = vec![String::from("curl"), "http://localhost:9187/metrics".to_string()]; + let result_stdout = run_command_in_container( + pods.clone(), + pod_name.clone(), + command, + Some("postgres".to_string()), + ) + .await; + + // Check if the result contains the expected metric + if result_stdout.contains(metric_name) { + return Ok(result_stdout); + } + + println!( + "Attempt {}/{}: Metric '{}' not found in output.", + attempt, max_retries, metric_name + ); + + tokio::time::sleep(wait_duration).await; + } + + Err(format!( + "Metric '{}' not found after {} attempts", + metric_name, max_retries + )) + } + #[tokio::test] #[ignore] async fn functional_test_basic_cnpg() { @@ -646,15 +635,6 @@ mod test { pod_ready_and_running(pods.clone(), pod_name.clone()).await; - let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; - let _ = wait_until_psql_contains( context.clone(), coredb_resource.clone(), @@ -664,37 +644,8 @@ mod test { ) .await; - let coredb_resource = coredbs.get(name).await.unwrap(); - let mut found_extension = false; - let mut retries = 0; - - while retries < 10 { - let status = &coredb_resource.status; - - if let Some(ref status) = status { - if let Some(ref extensions) = status.extensions { - for extension in extensions { - for location in &extension.locations { - if extension.name == "pg_jsonschema" && location.enabled.unwrap_or_default() { - found_extension = true; - assert_eq!(location.database, "postgres"); - assert_eq!( - location.schema.clone().unwrap_or_else(|| "public".to_string()), - "public" - ); - } - } - } - if found_extension { - break; - } - } - } - - // Sleep for a short duration before the next retry - tokio::time::sleep(Duration::from_secs(2)).await; - retries += 1; - } + // Wait for pg_jsonschema to be installed before proceeding. + let found_extension = trunk_install_status(&coredbs, name, "pg_jsonschema").await; assert!(found_extension); // Check for heartbeat table and values @@ -753,7 +704,6 @@ mod test { // Create a pod we can use to run commands in the cluster let pods: Api = Api::namespaced(client.clone(), &namespace); - let test_pod_name = create_test_buddy(pods.clone(), name.to_string()).await; // Apply a basic configuration of CoreDB println!("Creating CoreDB resource {}", name); @@ -812,60 +762,12 @@ mod test { let patch = Patch::Apply(&coredb_json); let coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap(); - // Wait for secret to be created - let secret_api: Api = Api::namespaced(client.clone(), &namespace); - let secret_name = format!("{}-exporter", name); - println!("Waiting for secret to be created: {}", secret_name); - let establish = await_condition(secret_api.clone(), &secret_name, wait_for_secret()); - let _ = tokio::time::timeout(Duration::from_secs(TIMEOUT_SECONDS_SECRET_PRESENT), establish) - .await - .unwrap_or_else(|_| { - panic!( - "Did not find the secret {} present after waiting {} seconds", - secret_name, TIMEOUT_SECONDS_SECRET_PRESENT - ) - }); - println!("Found secret: {}", secret_name); - - // assert for postgres-exporter secret to be created - let exporter_name = format!("{}-metrics", name); - let exporter_secret_name = format!("{}-exporter", name); - let exporter_secret = secret_api.get(&exporter_secret_name).await; - match exporter_secret { - Ok(secret) => { - // assert for non-empty data in the secret - assert!( - secret.data.map_or(false, |data| !data.is_empty()), - "postgres-exporter secret is empty!" - ); - } - Err(e) => panic!("Error getting postgres-exporter secret: {}", e), - } - // Wait for Pod to be created // This is the CNPG pod let pod_name = format!("{}-1", name); pod_ready_and_running(pods.clone(), pod_name.clone()).await; - let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; - - // assert that the postgres-exporter deployment was created - let deploy_api: Api = Api::namespaced(client.clone(), &namespace); - let exporter_deployment = deploy_api.get(exporter_name.clone().as_str()).await; - assert!( - exporter_deployment.is_ok(), - "postgres-exporter Deployment does not exist: {:?}", - exporter_deployment.err() - ); - // Assert default storage values are applied to PVC let pvc_api: Api = Api::namespaced(client.clone(), &namespace); let default_storage: Quantity = default_storage(); @@ -921,44 +823,26 @@ mod test { println!("{}", result.stdout.clone().unwrap()); assert!(result.stdout.clone().unwrap().contains("aggs_for_vecs")); - // Assert role 'postgres_exporter' was created - let result = psql_with_retry( - context.clone(), - coredb_resource.clone(), - "SELECT rolname FROM pg_roles;".to_string(), - ) - .await; - assert!( - result.stdout.clone().unwrap().contains("postgres_exporter"), - "results must contain postgres_exporter: {}", - result.stdout.clone().unwrap() - ); + // Check for metrics and availability + let metric_name = format!("cnpg_collector_up{{cluster=\"{}\"}} 1", name); + match wait_for_metric(pods.clone(), pod_name.to_string(), &metric_name).await { + Ok(result_stdout) => { + println!("Metric found: {}", result_stdout); + } + Err(e) => { + panic!("Failed to find metric: {}", e); + } + } - // Assert we can curl the metrics from the service - let metrics_service_name = format!("{}-metrics", name); - let command = vec![ - String::from("curl"), - format!("http://{metrics_service_name}/metrics"), - ]; - let result_stdout = - run_command_in_container(pods.clone(), test_pod_name.clone(), command, None).await; - assert!(result_stdout.contains("pg_up 1")); - println!("Found metrics when curling the metrics service"); - - // assert custom queries made it to metric server - let c = vec![ - "wget".to_owned(), - "-qO-".to_owned(), - "http://localhost:9187/metrics".to_owned(), - ]; - let result_stdout = run_command_in_container( - pods.clone(), - exporter_pod_name.to_string(), - c, - Some("postgres-exporter".to_string()), - ) - .await; - assert!(result_stdout.contains(&test_metric_decr)); + // Look for the custom metric + match wait_for_metric(pods.clone(), pod_name.to_string(), &test_metric_decr).await { + Ok(result_stdout) => { + println!("Metric found: {}", result_stdout); + } + Err(e) => { + panic!("Failed to find metric: {}", e); + } + } // Assert we can drop an extension after its been created let coredb_json = serde_json::json!({ @@ -1036,14 +920,6 @@ mod test { let s = storage.get("storage").unwrap().to_owned(); assert_eq!(Quantity("10Gi".to_owned()), s); - // Cleanup test buddy pod resource - pods.delete(&test_pod_name, &Default::default()).await.unwrap(); - println!("Waiting for test buddy pod to be deleted: {}", &test_pod_name); - let _assert_test_buddy_pod_deleted = tokio::time::timeout( - Duration::from_secs(TIMEOUT_SECONDS_POD_DELETED), - await_condition(pods.clone(), &test_pod_name, conditions::is_deleted("")), - ); - // Cleanup CoreDB resource coredbs.delete(name, &Default::default()).await.unwrap(); println!("Waiting for CoreDB to be deleted: {}", &name); @@ -2000,13 +1876,6 @@ mod test { pod_ready_and_running(pods.clone(), pod_name_primary.clone()).await; let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; // Assert that we can query the database with \dx; let result = psql_with_retry(context.clone(), coredb_resource.clone(), "\\dx".to_string()).await; @@ -2145,9 +2014,6 @@ mod test { let kind = "CoreDB"; let replicas = 1; - // Create a pod we can use to run commands in the cluster - let pods: Api = Api::namespaced(client.clone(), &namespace); - // Apply a basic configuration of CoreDB println!("Creating CoreDB resource {}", name); let coredbs: Api = Api::namespaced(client.clone(), &namespace); @@ -2193,19 +2059,10 @@ mod test { let coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap(); // Wait for CNPG Pod to be created + let pods: Api = Api::namespaced(client.clone(), &namespace); let pod_name = format!("{}-1", name); - pod_ready_and_running(pods.clone(), pod_name.clone()).await; - let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; - wait_until_psql_contains( context.clone(), coredb_resource.clone(), @@ -2288,9 +2145,6 @@ mod test { let kind = "CoreDB"; let replicas = 2; - // Create a pod we can use to run commands in the cluster - let pods: Api = Api::namespaced(client.clone(), &namespace); - // Apply a basic configuration of CoreDB println!("Creating CoreDB resource {}", name); let coredbs: Api = Api::namespaced(client.clone(), &namespace); @@ -2308,17 +2162,7 @@ mod test { let params = PatchParams::apply("tembo-integration-test"); let patch = Patch::Apply(&coredb_json); let coredb_resource = coredbs.patch(name, ¶ms, &patch).await.unwrap(); - - // Wait for CNPG Pod to be created - let pod_name_primary = format!("{}-1", name); - pod_ready_and_running(pods.clone(), pod_name_primary.clone()).await; - let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); // Wait for CNPG Cluster to be created by looping over replicas until // they are in a running state @@ -2331,6 +2175,11 @@ mod test { let result = psql_with_retry(context.clone(), coredb_resource.clone(), "\\dx".to_string()).await; assert!(result.stdout.clone().unwrap().contains("plpgsql")); + for i in 1..=replicas { + let pod_name = format!("{}-{}", name, i); + pod_ready_and_running(pods.clone(), pod_name).await; + } + // Assert that both pods are replicating successfully let result = psql_with_retry( context.clone(), @@ -2450,8 +2299,6 @@ mod test { // Create a pod we can use to run commands in the cluster let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); // Apply a basic configuration of CoreDB println!("Creating CoreDB resource {}", name); @@ -2477,9 +2324,6 @@ mod test { let pod_name = format!("{}-{}", name, i); pod_ready_and_running(pods.clone(), pod_name).await; } - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; let _result = wait_until_psql_contains( context.clone(), @@ -2672,8 +2516,6 @@ mod test { // Create a pod we can use to run commands in the cluster let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); // Apply a basic configuration of CoreDB println!("Creating CoreDB resource {}", name); @@ -2745,9 +2587,6 @@ mod test { let pod_name = format!("{}-{}", name, i); pod_ready_and_running(pods.clone(), pod_name).await; } - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; // Assert that we can query the database with \dx; let result = psql_with_retry(context.clone(), coredb_resource.clone(), "\\dx".to_string()).await; @@ -2783,9 +2622,6 @@ mod test { let pod_name = format!("{}-{}", name, i); pod_ready_and_running(pods.clone(), pod_name).await; } - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; for pod in &retrieved_pods { let cmd = vec![ @@ -3904,15 +3740,6 @@ CREATE EVENT TRIGGER pgrst_watch pod_ready_and_running(pods.clone(), pod_name.clone()).await; - let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; - // Assert status contains configs let mut found_configs = false; let expected_config = ConfigValue::Multiple(BTreeSet::from_iter(vec![ @@ -4079,12 +3906,6 @@ CREATE EVENT TRIGGER pgrst_watch pod_ready_and_running(pods.clone(), pod_name.clone()).await; let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - // Wait for CNPG Cluster to be created by looping over replicas until // they are in a running state for i in 1..=replicas { @@ -4268,11 +4089,6 @@ CREATE EVENT TRIGGER pgrst_watch pod_ready_and_running(restore_pods.clone(), restore_pod_name.clone()).await; let restore_pods: Api = Api::namespaced(client.clone(), &restore_namespace); - let lp = ListParams::default() - .labels(format!("app=postgres-exporter,coredb.io/name={}", restore_name).as_str()); - let exporter_pods = restore_pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); // Wait for CNPG Cluster to be created by looping over replicas until // they are in a running state @@ -4433,15 +4249,6 @@ CREATE EVENT TRIGGER pgrst_watch pod_ready_and_running(pods.clone(), pod_name.clone()).await; - let pods: Api = Api::namespaced(client.clone(), &namespace); - let lp = - ListParams::default().labels(format!("app=postgres-exporter,coredb.io/name={}", name).as_str()); - let exporter_pods = pods.list(&lp).await.expect("could not get pods"); - let exporter_pod_name = exporter_pods.items[0].metadata.name.as_ref().unwrap(); - println!("Exporter pod name: {}", &exporter_pod_name); - - pod_ready_and_running(pods.clone(), exporter_pod_name.clone()).await; - // Check for pooler let pooler_name = format!("{}-pooler", name); let poolers: Api = Api::namespaced(client.clone(), &namespace); diff --git a/tembo-operator/yaml/sample-message-queue.yaml b/tembo-operator/yaml/sample-message-queue.yaml index 04b7aadee..79e25ef3d 100644 --- a/tembo-operator/yaml/sample-message-queue.yaml +++ b/tembo-operator/yaml/sample-message-queue.yaml @@ -36,7 +36,7 @@ spec: value: all trunk_installs: - name: pgmq - version: 0.32.1 + version: 1.1.0 - name: pg_partman version: 4.7.3 extensions: @@ -55,7 +55,7 @@ spec: image: quay.io/prometheuscommunity/postgres-exporter:v0.12.0 queries: pgmq: - query: select queue_name, queue_length, oldest_msg_age_sec, newest_msg_age_sec,total_messages from public.pgmq_metrics_all() + query: select queue_name, queue_length, oldest_msg_age_sec, newest_msg_age_sec, total_messages from public.pgmq_metrics_all() master: true metrics: - queue_name: