diff --git a/README.md b/README.md index 95e1e57..1e08fd4 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,50 @@ options ( object 'metric_values' ); ``` + +CREATE FOREIGN TABLE IF NOT EXISTS metrics ( + metric_name TEXT, + metric_labels JSONB, + metric_time BIGINT, + metric_value FLOAT8 + ) +server my_prometheus_server +options ( + object 'metrics' +); + + +-- Create metric_labels table +CREATE TABLE public.metric_labels_local ( + metric_id BIGINT NOT NULL, + metric_name TEXT NOT NULL, + metric_name_label TEXT NOT NULL, + metric_labels JSONB, + PRIMARY KEY (metric_id), + UNIQUE (metric_name, metric_labels) +); + +-- Create indexes for metric_labels table +CREATE INDEX metric_labels_labels_idx ON public.metric_labels USING gin (metric_labels); + +-- Create partitioned metric_values_local table +CREATE TABLE public.metric_values_local ( + metric_id BIGINT NOT NULL, + metric_time BIGINT, + metric_value DOUBLE PRECISION NOT NULL +) PARTITION BY RANGE (metric_time); + +-- Create indexes for metric_values table +CREATE INDEX metric_values_id_time_idx ON public.metric_values (metric_id, metric_time DESC); +CREATE INDEX metric_values_time_idx ON public.metric_values (metric_time DESC); + +-- You can create a partition of metric_values table for a specific date range like so: +CREATE TABLE public.metric_values_20231002 PARTITION OF public.metric_values + FOR VALUES FROM ('2023-10-02 00:00:00+00') TO ('2023-10-03 00:00:00+00'); + + + + SELECT * FROM metric_values WHERE metric_time > 1696046400 AND metric_time < 1696132800; SELECT diff --git a/src/lib.rs b/src/lib.rs index 1535f2e..6ab532e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,122 +1,52 @@ use pgrx::warning; use pgrx::{pg_sys, prelude::*, JsonB}; use reqwest::{self, Client}; +use serde_json::Value as JsonValue; use std::collections::HashMap; use std::env; -use tokio::runtime::Runtime; -// use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; -use serde_json::Value as JsonValue; -use std::str::FromStr; use supabase_wrappers::prelude::*; +use tokio::runtime::Runtime; pgrx::pg_module_magic!(); -fn body_to_rows( - resp: &JsonValue, - obj_key: &str, - normal_cols: Vec<(&str, &str, &str)>, - tgt_cols: &[Column], -) -> Vec { - let mut result = Vec::new(); - - let objs = if resp.is_array() { - // If `resp` is directly an array - resp.as_array().unwrap() - } else { - // If `resp` is an object containing the array under `obj_key` - match resp - .as_object() - .and_then(|v| v.get(obj_key)) - .and_then(|v| v.as_array()) - { - Some(objs) => objs, - None => return result, - } - }; - - for obj in objs { - let mut row = Row::new(); - - // extract normal columns - for tgt_col in tgt_cols { - if let Some((src_name, col_name, col_type)) = - normal_cols.iter().find(|(_, c, _)| c == &tgt_col.name) - { - // Navigate through nested properties - let mut current_value: Option<&JsonValue> = Some(obj); - for part in src_name.split('.') { - current_value = current_value.unwrap().as_object().unwrap().get(part); - } - - if *src_name == "email_addresses" { - current_value = current_value - .and_then(|v| v.as_array().and_then(|arr| arr.get(0))) - .and_then(|first_obj| { - first_obj - .as_object() - .and_then(|obj| obj.get("email_address")) - }); - } - - let cell = current_value.and_then(|v| match *col_type { - "bool" => v.as_bool().map(Cell::Bool), - "i64" => v.as_i64().map(Cell::I64), - "string" => v.as_str().map(|a| Cell::String(a.to_owned())), - "timestamp" => v.as_str().map(|a| { - let secs = a.parse::().unwrap() / 1000; - let ts = to_timestamp(secs as f64); - Cell::Timestamp(ts.to_utc()) - }), - "timestamp_iso" => v.as_str().map(|a| { - let ts = Timestamp::from_str(a).unwrap(); - Cell::Timestamp(ts) - }), - "json" => Some(Cell::Json(JsonB(v.clone()))), - _ => None, - }); - row.push(col_name, cell); - } - } - - // put all properties into 'attrs' JSON column - if tgt_cols.iter().any(|c| &c.name == "attrs") { - let attrs = serde_json::from_str(&obj.to_string()).unwrap(); - row.push("attrs", Some(Cell::Json(JsonB(attrs)))); - } - - result.push(row); - } - result -} - // convert response body text to rows -fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> Vec { +fn resp_to_rows(obj: &str, resp: &JsonValue) -> Vec { let mut result = Vec::new(); match obj { - "metric_labels" => { - result = body_to_rows( - resp, - "data", - vec![ - ("id", "metric_id", "i64"), - ("metric_name", "metric_name", "string"), - ("metric_name_label", "metric_name_label", "string"), - ("metric_labels", "metric_labels", "json"), - ], - tgt_cols, - ); - } - "metric_values" => { - result = body_to_rows( - resp, - "data", - vec![ - ("id", "metric_id", "i64"), - ("timestamp", "timestamp", "i64"), - ("value", "value", "i64"), - ], - tgt_cols, - ); + "metrics" => { + if let Some(result_array) = resp["data"]["result"].as_array() { + for result_obj in result_array { + let metric_name = result_obj["metric"]["__name__"] + .as_str() + .unwrap_or_default() + .to_string(); + let metric_labels = result_obj["metric"].clone(); + if let Some(values_array) = result_obj["values"].as_array() { + for value_pair in values_array { + if let (Some(time_str), Some(value_str)) = + (value_pair[0].as_i64(), value_pair[1].as_str()) + { + if let (metric_time, Ok(metric_value)) = + (time_str, value_str.parse::()) + { + let mut row = Row::new(); + row.push( + "metric_name", + Some(Cell::String(metric_name.clone())), + ); + row.push( + "metric_labels", + Some(Cell::Json(JsonB(metric_labels.clone()))), + ); + row.push("metric_time", Some(Cell::I64(metric_time))); + row.push("metric_value", Some(Cell::F64(metric_value))); + result.push(row); + } + } + } + } + } + } } _ => { warning!("unsupported object: {}", obj); @@ -141,22 +71,7 @@ pub(crate) struct PrometheusFdw { impl PrometheusFdw { const DEFAULT_BASE_URL: &'static str = - "https://prometheus-control-1.use1.dev.plat.cdb-svc.com/"; - - fn map_operator(op: &str) -> &str { - match op { - "=" => "=\"", - "!=" => "!=\"", - ">" => ">\"", - "<" => "<\"", - ">=" => ">=\"", - "<=" => "<=\"", - _ => { - println!("unsupported operator: {}", op); - "\"" - } - } - } + "https://prometheus-control-1.use1.dev.plat.cdb-svc.com/api/v1/query"; fn value_to_promql_string(value: &supabase_wrappers::interface::Value) -> String { match value { @@ -215,28 +130,11 @@ impl PrometheusFdw { } fn build_url(&self, obj: &str, _options: &HashMap, quals: &[Qual]) -> String { - let base_url = "https://prometheus-control-1.use1.dev.plat.cdb-svc.com/api/v1/query"; - match obj { - "metric_labels" => { - // Find the metric_name filter from quals + "metrics" => { let metric_name_filter = quals .iter() .find(|qual| qual.field == "metric_name" && qual.operator == "="); - - // If a metric_name filter is found, build the query URL - if let Some(metric_name_qual) = metric_name_filter { - let metric_name = Self::value_to_promql_string(&metric_name_qual.value); - let ret = format!("{}?query={}", base_url, metric_name); - warning!("inside metric_labels"); - warning!("Constructed URL: {}", ret); - ret - } else { - println!("No metric_name filter found in quals"); - "".to_string() - } - } - "metric_values" => { let lower_timestamp = quals .iter() .find(|qual| qual.field == "metric_time" && qual.operator == ">"); @@ -245,18 +143,19 @@ impl PrometheusFdw { .iter() .find(|qual| qual.field == "metric_time" && qual.operator == "<"); - // If both lower and upper timestamp filters are found, build the query URL - if let (Some(lower_timestamp), Some(upper_timestamp)) = - (lower_timestamp, upper_timestamp) + if let (Some(metric_name), Some(lower_timestamp), Some(upper_timestamp)) = + (metric_name_filter, lower_timestamp, upper_timestamp) { + let metric_name = Self::value_to_promql_string(&metric_name.value); let lower_timestamp = Self::value_to_promql_string(&lower_timestamp.value); let upper_timestamp = Self::value_to_promql_string(&upper_timestamp.value); let ret = format!( - "{}_range?query=container_threads&start={}&end={}&step=1m", - base_url, lower_timestamp, upper_timestamp + "{}_range?query={}&start={}&end={}&step=10m", + Self::DEFAULT_BASE_URL, + metric_name, + lower_timestamp, + upper_timestamp ); - warning!("inside metric_values"); - warning!("Constructed URL: {}", ret); ret } else { println!("Timestamp filters not found in quals"); @@ -264,7 +163,7 @@ impl PrometheusFdw { } } _ => { - println!("unsupported object: {:#?}", obj); + println!("Unsupported object: {}", obj); "".to_string() } } @@ -304,14 +203,12 @@ impl ForeignDataWrapper for PrometheusFdw { self.scan_result = None; self.tgt_cols = columns.to_vec(); - let api_key = "".to_string(); if let Some(client) = &self.client { let mut result = Vec::new(); - if obj == "metric_labels" || obj == "metric_values" { + if obj == "metrics" { let url = self.build_url(&obj, options, quals); - warning!("url: {}", url); let resp = self.rt.block_on(async { client.get(&url).send().await }); @@ -320,11 +217,8 @@ impl ForeignDataWrapper for PrometheusFdw { let body = self.rt.block_on(async { resp.text().await }); match body { Ok(body) => { - // warning!("body: {}", body); let json: JsonValue = serde_json::from_str(&body).unwrap(); - // warning!("json: {:#?}", json[0]); - result = resp_to_rows(&obj, &json, columns); - // warning!("result: {:#?}", result); + result = resp_to_rows(&obj, &json); } Err(e) => { warning!("failed to get body: {}", e);