diff --git a/conductor/justfile b/conductor/justfile index d057e4caa..0dcf9f100 100644 --- a/conductor/justfile +++ b/conductor/justfile @@ -80,6 +80,7 @@ watch: BACKUP_ARCHIVE_BUCKET=cdb-plat-use1-dev-instance-backups \ STORAGE_ARCHIVE_BUCKET=cdb-plat-use1-dev-instance-storage \ IS_CLOUD_FORMATION=false \ + IS_GCP=false \ cargo watch -x run run-postgres: diff --git a/conductor/src/cloud.rs b/conductor/src/cloud.rs new file mode 100644 index 000000000..c48c5eee3 --- /dev/null +++ b/conductor/src/cloud.rs @@ -0,0 +1,62 @@ +// Add this enum at the top of your file or in a separate module +pub struct CloudProviderBuilder { + gcp: bool, + aws: bool, +} + +impl CloudProviderBuilder { + fn new() -> Self { + CloudProviderBuilder { + gcp: false, + aws: false, + } + } + + pub fn gcp(mut self, value: bool) -> Self { + self.gcp = value; + self + } + + pub fn aws(mut self, value: bool) -> Self { + self.aws = value; + self + } + + pub fn build(self) -> CloudProvider { + if self.gcp { + CloudProvider::GCP + } else if self.aws { + CloudProvider::AWS + } else { + CloudProvider::Unknown + } + } +} + +pub enum CloudProvider { + AWS, + GCP, + Unknown, +} + +impl CloudProvider { + pub fn as_str(&self) -> &'static str { + match self { + CloudProvider::AWS => "aws", + CloudProvider::GCP => "gcp", + CloudProvider::Unknown => "unknown", + } + } + + pub fn prefix(&self) -> &'static str { + match self { + CloudProvider::AWS => "s3://", + CloudProvider::GCP => "gs://", + CloudProvider::Unknown => "", + } + } + + pub fn builder() -> CloudProviderBuilder { + CloudProviderBuilder::new() + } +} diff --git a/conductor/src/errors.rs b/conductor/src/errors.rs index 7d8199264..9c5513c91 100644 --- a/conductor/src/errors.rs +++ b/conductor/src/errors.rs @@ -53,4 +53,8 @@ pub enum ConductorError { /// Google Cloud Storage error #[error("Google Cloud Storage error: {0}")] GcsError(#[from] GcsError), + + /// Dataplane error + #[error("Dataplane not found error: {0}")] + DataplaneError(String), } diff --git a/conductor/src/lib.rs b/conductor/src/lib.rs index 65321f4cc..8a8dca781 100644 --- a/conductor/src/lib.rs +++ b/conductor/src/lib.rs @@ -1,4 +1,5 @@ pub mod aws; +pub mod cloud; pub mod errors; pub mod extensions; pub mod gcp; @@ -7,7 +8,10 @@ pub mod monitoring; pub mod routes; pub mod types; -use crate::aws::cloudformation::{AWSConfigState, CloudFormationParams}; +use crate::{ + aws::cloudformation::{AWSConfigState, CloudFormationParams}, + cloud::CloudProvider, +}; use aws_sdk_cloudformation::config::Region; use controller::apis::coredb_types::{CoreDB, CoreDBSpec}; use errors::ConductorError; @@ -18,7 +22,7 @@ use kube::api::{DeleteParams, ListParams, Patch, PatchParams}; use chrono::{DateTime, SecondsFormat, Utc}; use kube::{Api, Client, ResourceExt}; -use log::{debug, info}; +use log::{debug, info, warn}; use serde_json::{from_str, to_string, Value}; use std::{ collections::hash_map::DefaultHasher, @@ -27,6 +31,7 @@ use std::{ pub type Result = std::result::Result; +#[allow(clippy::too_many_arguments)] pub async fn generate_spec( org_id: &str, entity_name: &str, @@ -35,18 +40,38 @@ pub async fn generate_spec( namespace: &str, backups_bucket: &str, spec: &CoreDBSpec, -) -> Value { + cloud_provider: &CloudProvider, +) -> Result { let mut spec = spec.clone(); - // Add the bucket name into the backups_path if it's not already there - if let Some(restore) = &mut spec.restore { - if let Some(backups_path) = &mut restore.backups_path { - if !backups_path.starts_with(&format!("s3://{}", backups_bucket)) { - let path_suffix = backups_path.trim_start_matches("s3://"); - *backups_path = format!("s3://{}/{}", backups_bucket, path_suffix); + + match cloud_provider { + CloudProvider::AWS | CloudProvider::GCP => { + let prefix = cloud_provider.prefix(); + + // Format the backups_path with the correct prefix + if let Some(restore) = &mut spec.restore { + if let Some(backups_path) = &mut restore.backups_path { + let clean_path = remove_known_prefixes(backups_path); + if clean_path.starts_with(backups_bucket) { + // If the path already includes the bucket, just add the prefix + *backups_path = format!("{}{}", prefix, clean_path); + } else { + // If the path doesn't include the bucket, add both prefix and bucket + *backups_path = format!("{}{}/{}", prefix, backups_bucket, clean_path); + } + } } } + CloudProvider::Unknown => { + warn!( + "Unknown cloud provider or cloud provider is disabled, restore spec removed from Spec value", + ); + // Remove the restore information if the cloud provider is unknown + spec.restore = None; + } } - serde_json::json!({ + + Ok(serde_json::json!({ "apiVersion": "coredb.io/v1alpha1", "kind": "CoreDB", "metadata": { @@ -59,7 +84,18 @@ pub async fn generate_spec( } }, "spec": spec, - }) + })) +} + +// Remove known prefixes from the backup path +fn remove_known_prefixes(path: &str) -> &str { + let known_prefixes = ["s3://", "gs://", "https://"]; + for prefix in &known_prefixes { + if let Some(stripped) = path.strip_prefix(prefix) { + return stripped; + } + } + path } pub fn get_data_plane_id_from_coredb(coredb: &CoreDB) -> Result> { @@ -636,16 +672,19 @@ mod tests { }), ..CoreDBSpec::default() }; + let cloud_provider = CloudProvider::AWS; let result = generate_spec( "org-id", "entity-name", "instance-id", - "data-plane-id", + "aws_data_1_use1", "namespace", "my-bucket", &spec, + &cloud_provider, ) - .await; + .await + .expect("Failed to generate spec"); let expected_backups_path = "s3://my-bucket/coredb/coredb/org-coredb-inst-pgtrunkio-dev"; assert_eq!( result["spec"]["restore"]["backupsPath"].as_str().unwrap(), @@ -664,16 +703,19 @@ mod tests { }), ..CoreDBSpec::default() }; + let cloud_provider = CloudProvider::AWS; let result = generate_spec( "org-id", "entity-name", "instance-id", - "data-plane-id", + "aws_data_1_use1", "namespace", "my-bucket", &spec, + &cloud_provider, ) - .await; + .await + .expect("Failed to generate spec"); let expected_backups_path = "s3://my-bucket/coredb/coredb/org-coredb-inst-pgtrunkio-dev"; assert_eq!( result["spec"]["restore"]["backupsPath"].as_str().unwrap(), @@ -690,16 +732,19 @@ mod tests { }), ..CoreDBSpec::default() }; + let cloud_provider = CloudProvider::AWS; let result = generate_spec( "org-id", "entity-name", "instance-id", - "data-plane-id", + "aws_data_1_use1", "namespace", "my-bucket", &spec, + &cloud_provider, ) - .await; + .await + .expect("Failed to generate spec"); assert!(result["spec"]["restore"]["backupsPath"].is_null()); } @@ -709,16 +754,77 @@ mod tests { restore: None, ..CoreDBSpec::default() }; + let cloud_provider = CloudProvider::AWS; let result = generate_spec( "org-id", "entity-name", "instance-id", - "data-plane-id", + "aws_data_1_use1", "namespace", "my-bucket", &spec, + &cloud_provider, ) - .await; + .await + .expect("Failed to generate spec"); assert!(result["spec"]["restore"].is_null()); } + + #[tokio::test] + async fn test_generate_spec_with_non_matching_gcp_bucket() { + let spec = CoreDBSpec { + restore: Some(Restore { + backups_path: Some("gs://v2/test-instance".to_string()), + ..Restore::default() + }), + ..CoreDBSpec::default() + }; + let cloud_provider = CloudProvider::GCP; + let result = generate_spec( + "org-id", + "entity-name", + "instance-id", + "gcp_data_1_usc1", + "namespace", + "my-bucket", + &spec, + &cloud_provider, + ) + .await + .expect("Failed to generate spec"); + let expected_backups_path = "gs://my-bucket/v2/test-instance"; + assert_eq!( + result["spec"]["restore"]["backupsPath"].as_str().unwrap(), + expected_backups_path + ); + } + + #[tokio::test] + async fn test_generate_spec_with_gcp_bucket() { + let spec = CoreDBSpec { + restore: Some(Restore { + backups_path: Some("gs://my-bucket/v2/test-instance".to_string()), + ..Restore::default() + }), + ..CoreDBSpec::default() + }; + let cloud_provider = CloudProvider::GCP; + let result = generate_spec( + "org-id", + "entity-name", + "instance-id", + "gcp_data_1_usc1", + "namespace", + "my-bucket", + &spec, + &cloud_provider, + ) + .await + .expect("Failed to generate spec"); + let expected_backups_path = "gs://my-bucket/v2/test-instance"; + assert_eq!( + result["spec"]["restore"]["backupsPath"].as_str().unwrap(), + expected_backups_path + ); + } } diff --git a/conductor/src/main.rs b/conductor/src/main.rs index 8d0c453c6..031b4023b 100644 --- a/conductor/src/main.rs +++ b/conductor/src/main.rs @@ -3,10 +3,11 @@ use actix_web_opentelemetry::{PrometheusMetricsHandler, RequestTracing}; use conductor::errors::ConductorError; use conductor::monitoring::CustomMetrics; use conductor::{ - create_cloudformation, create_gcp_storage_workload_identity_binding, create_namespace, - create_or_update, delete, delete_cloudformation, delete_gcp_storage_workload_identity_binding, - delete_namespace, generate_cron_expression, generate_spec, get_coredb_error_without_status, - get_one, get_pg_conn, lookup_role_arn, restart_coredb, types, + cloud::CloudProvider, create_cloudformation, create_gcp_storage_workload_identity_binding, + create_namespace, create_or_update, delete, delete_cloudformation, + delete_gcp_storage_workload_identity_binding, delete_namespace, generate_cron_expression, + generate_spec, get_coredb_error_without_status, get_one, get_pg_conn, lookup_role_arn, + restart_coredb, types, }; use crate::metrics_reporter::run_metrics_reporter; @@ -137,6 +138,12 @@ async fn run(metrics: CustomMetrics) -> Result<(), ConductorError> { log::info!("Database migrations have been successfully applied."); + // Determine the cloud provider using the builder + let cloud_provider = CloudProvider::builder() + .gcp(is_gcp) + .aws(is_cloud_formation) + .build(); + loop { // Read from queue (check for new message) // messages that dont fit a CRUDevent will error @@ -357,8 +364,9 @@ async fn run(metrics: CustomMetrics) -> Result<(), ConductorError> { &namespace, &backup_archive_bucket, &coredb_spec, + &cloud_provider, ) - .await; + .await?; info!("{}: Creating or updating spec", read_msg.msg_id); // create or update CoreDB diff --git a/conductor/tests/integration_tests.rs b/conductor/tests/integration_tests.rs index d4f90f4d1..d8e855fee 100644 --- a/conductor/tests/integration_tests.rs +++ b/conductor/tests/integration_tests.rs @@ -157,7 +157,7 @@ mod test { namespace: namespace.clone(), backups_read_path: None, backups_write_path: None, - data_plane_id: "org_02s3owPQskuGXHE8vYsGSY".to_owned(), + data_plane_id: "aws_data_1_use1".to_owned(), org_id: "org_02s3owPQskuGXHE8vYsGSY".to_owned(), inst_id: "inst_02s4UKVbRy34SAYVSwZq2H".to_owned(), event_type: types::Event::Create, @@ -256,7 +256,7 @@ mod test { namespace: namespace.clone(), backups_read_path: None, backups_write_path: None, - data_plane_id: "org_02s3owPQskuGXHE8vYsGSY".to_owned(), + data_plane_id: "aws_data_1_use1".to_owned(), org_id: "org_02s3owPQskuGXHE8vYsGSY".to_owned(), inst_id: "inst_02s4UKVbRy34SAYVSwZq2H".to_owned(), event_type: types::Event::Update, @@ -328,7 +328,7 @@ mod test { namespace: namespace.clone(), backups_read_path: None, backups_write_path: None, - data_plane_id: "org_02s3owPQskuGXHE8vYsGSY".to_owned(), + data_plane_id: "aws_data_1_use1".to_owned(), org_id: "org_02s3owPQskuGXHE8vYsGSY".to_owned(), inst_id: "inst_02s4UKVbRy34SAYVSwZq2H".to_owned(), event_type: types::Event::Restart, @@ -380,7 +380,7 @@ mod test { namespace: namespace.clone(), backups_write_path: None, backups_read_path: None, - data_plane_id: "org_02s3owPQskuGXHE8vYsGSY".to_owned(), + data_plane_id: "aws_data_1_use1".to_owned(), org_id: "org_02s3owPQskuGXHE8vYsGSY".to_owned(), inst_id: "inst_02s4UKVbRy34SAYVSwZq2H".to_owned(), event_type: types::Event::Delete, diff --git a/inference-gateway/src/config.rs b/inference-gateway/src/config.rs index 734823ac0..623308448 100644 --- a/inference-gateway/src/config.rs +++ b/inference-gateway/src/config.rs @@ -9,6 +9,8 @@ pub struct Config { pub llm_service_host_port: Url, /// Postgres connection string to the timeseries databse which logs token usage pub pg_conn_str: String, + /// Postgres connection string for the Control Plane queue + pub billing_queue_conn_str: String, /// Port to run the inference gateway on pub server_port: u16, /// Number of actix workers to spawn @@ -29,6 +31,10 @@ impl Config { "DATABASE_URL", "postgresql://postgres:postgres@0.0.0.0:5432/postgres", ), + billing_queue_conn_str: from_env_default( + "QUEUE_CONN_URL", + "postgresql://postgres:postgres@0.0.0.0:5432/postgres", + ), server_port: from_env_default("WEBSERVER_PORT", "8080") .parse::() .unwrap_or(8080), diff --git a/inference-gateway/src/daemon.rs b/inference-gateway/src/daemon.rs index 47749509a..6dd9158c5 100644 --- a/inference-gateway/src/daemon.rs +++ b/inference-gateway/src/daemon.rs @@ -21,10 +21,13 @@ async fn main() -> Result<(), Box> { info!("Spawning AI billing reporter thread"); let pg_conn = cfg.pg_conn_str.clone(); + let billing_queue_conn = cfg.billing_queue_conn_str.clone(); background_threads_guard.push(tokio::spawn(async move { loop { - if let Err(err) = run_events_reporter(pg_conn.clone()).await { + if let Err(err) = + run_events_reporter(pg_conn.clone(), billing_queue_conn.clone()).await + { log::error!("Tembo AI billing reporter error: {err}"); log::info!("Restarting Tembo AI billing reporter in 30 sec"); tokio::time::sleep(Duration::from_secs(30)).await; diff --git a/inference-gateway/src/events_reporter.rs b/inference-gateway/src/events_reporter.rs index a12582fda..cdd4da243 100644 --- a/inference-gateway/src/events_reporter.rs +++ b/inference-gateway/src/events_reporter.rs @@ -116,21 +116,20 @@ fn get_hourly_chunks( while chunk_start < last_complete_hour { let chunk_end = chunk_start + ChronoDuration::hours(1) - ChronoDuration::nanoseconds(1); chunks.push((chunk_start, chunk_end)); - chunk_start = chunk_start + ChronoDuration::hours(1); + chunk_start += ChronoDuration::hours(1); } chunks } -pub async fn run_events_reporter(pg_conn: String) -> Result<()> { +pub async fn run_events_reporter(pg_conn: String, billing_queue_conn: String) -> Result<()> { // Run once per hour const SYNC_PERIOD: Duration = Duration::from_secs(60 * 60); const BILLING_QUEUE: &str = "billing_aws_data_1_use1"; - let pool = db::connect(&pg_conn, 2).await?; - - let queue = PGMQueueExt::new(pg_conn, 2).await?; + let inference_pool = db::connect(&pg_conn, 2).await?; + let queue = PGMQueueExt::new(billing_queue_conn, 2).await?; queue.init().await?; queue.create(BILLING_QUEUE).await?; @@ -139,7 +138,7 @@ pub async fn run_events_reporter(pg_conn: String) -> Result<()> { loop { sync_interval.tick().await; - let last_reported_at = get_reporter_watermark(&pool) + let last_reported_at = get_reporter_watermark(&inference_pool) .await? .map(|watermark| watermark.last_reported_at) .unwrap_or(Utc::now() - Duration::from_secs(60 * 60)); @@ -148,7 +147,7 @@ pub async fn run_events_reporter(pg_conn: String) -> Result<()> { let chunks = get_hourly_chunks(last_reported_at, now); for (start_time, end_time) in chunks { - enqueue_event(&pool, &queue, BILLING_QUEUE, start_time, end_time).await?; + enqueue_event(&inference_pool, &queue, BILLING_QUEUE, start_time, end_time).await?; } } } diff --git a/tembo-stacks/Cargo.lock b/tembo-stacks/Cargo.lock index bf67e71be..e7bcfbeaf 100644 --- a/tembo-stacks/Cargo.lock +++ b/tembo-stacks/Cargo.lock @@ -2471,7 +2471,7 @@ dependencies = [ [[package]] name = "tembo-stacks" -version = "0.16.2" +version = "0.16.3" dependencies = [ "anyhow", "clap", diff --git a/tembo-stacks/Cargo.toml b/tembo-stacks/Cargo.toml index 5c2ca9a55..6206a76f3 100644 --- a/tembo-stacks/Cargo.toml +++ b/tembo-stacks/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "tembo-stacks" description = "Tembo Stacks for Postgres" -version = "0.16.2" +version = "0.16.3" authors = ["tembo.io"] edition = "2021" license = "Apache-2.0" diff --git a/tembo-stacks/src/stacks/specs/paradedb.yaml b/tembo-stacks/src/stacks/specs/paradedb.yaml index 49a3bc242..2b91b889a 100644 --- a/tembo-stacks/src/stacks/specs/paradedb.yaml +++ b/tembo-stacks/src/stacks/specs/paradedb.yaml @@ -38,8 +38,6 @@ trunk_installs: version: 1.6.2 - name: pgvector version: 0.7.4 - - name: vectorscale - version: 0.3.0 - name: pg_ivm version: 1.9.0 extensions: @@ -69,11 +67,6 @@ extensions: - database: postgres enabled: true version: 0.1.2 - - name: vectorscale - locations: - - database: postgres - enabled: true - version: 0.3.0 - name: pg_ivm locations: - database: postgres