Skip to content

Commit

Permalink
basic working
Browse files Browse the repository at this point in the history
  • Loading branch information
Jayko001 committed Oct 5, 2023
1 parent 3fff1db commit a950852
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 156 deletions.
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,50 @@ options (
object 'metric_values'
);
```

CREATE FOREIGN TABLE IF NOT EXISTS metrics (
metric_name TEXT,
metric_labels JSONB,
metric_time BIGINT,
metric_value FLOAT8
)
server my_prometheus_server
options (
object 'metrics'
);


-- Create metric_labels table
CREATE TABLE public.metric_labels_local (
metric_id BIGINT NOT NULL,
metric_name TEXT NOT NULL,
metric_name_label TEXT NOT NULL,
metric_labels JSONB,
PRIMARY KEY (metric_id),
UNIQUE (metric_name, metric_labels)
);

-- Create indexes for metric_labels table
CREATE INDEX metric_labels_labels_idx ON public.metric_labels USING gin (metric_labels);

-- Create partitioned metric_values_local table
CREATE TABLE public.metric_values_local (
metric_id BIGINT NOT NULL,
metric_time BIGINT,
metric_value DOUBLE PRECISION NOT NULL
) PARTITION BY RANGE (metric_time);

-- Create indexes for metric_values table
CREATE INDEX metric_values_id_time_idx ON public.metric_values (metric_id, metric_time DESC);
CREATE INDEX metric_values_time_idx ON public.metric_values (metric_time DESC);

-- You can create a partition of metric_values table for a specific date range like so:
CREATE TABLE public.metric_values_20231002 PARTITION OF public.metric_values
FOR VALUES FROM ('2023-10-02 00:00:00+00') TO ('2023-10-03 00:00:00+00');




SELECT * FROM metric_values WHERE metric_time > 1696046400 AND metric_time < 1696132800;

SELECT
Expand Down
206 changes: 50 additions & 156 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,122 +1,52 @@
use pgrx::warning;
use pgrx::{pg_sys, prelude::*, JsonB};
use reqwest::{self, Client};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::env;
use tokio::runtime::Runtime;
// use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use serde_json::Value as JsonValue;
use std::str::FromStr;
use supabase_wrappers::prelude::*;
use tokio::runtime::Runtime;
pgrx::pg_module_magic!();

fn body_to_rows(
resp: &JsonValue,
obj_key: &str,
normal_cols: Vec<(&str, &str, &str)>,
tgt_cols: &[Column],
) -> Vec<Row> {
let mut result = Vec::new();

let objs = if resp.is_array() {
// If `resp` is directly an array
resp.as_array().unwrap()
} else {
// If `resp` is an object containing the array under `obj_key`
match resp
.as_object()
.and_then(|v| v.get(obj_key))
.and_then(|v| v.as_array())
{
Some(objs) => objs,
None => return result,
}
};

for obj in objs {
let mut row = Row::new();

// extract normal columns
for tgt_col in tgt_cols {
if let Some((src_name, col_name, col_type)) =
normal_cols.iter().find(|(_, c, _)| c == &tgt_col.name)
{
// Navigate through nested properties
let mut current_value: Option<&JsonValue> = Some(obj);
for part in src_name.split('.') {
current_value = current_value.unwrap().as_object().unwrap().get(part);
}

if *src_name == "email_addresses" {
current_value = current_value
.and_then(|v| v.as_array().and_then(|arr| arr.get(0)))
.and_then(|first_obj| {
first_obj
.as_object()
.and_then(|obj| obj.get("email_address"))
});
}

let cell = current_value.and_then(|v| match *col_type {
"bool" => v.as_bool().map(Cell::Bool),
"i64" => v.as_i64().map(Cell::I64),
"string" => v.as_str().map(|a| Cell::String(a.to_owned())),
"timestamp" => v.as_str().map(|a| {
let secs = a.parse::<i64>().unwrap() / 1000;
let ts = to_timestamp(secs as f64);
Cell::Timestamp(ts.to_utc())
}),
"timestamp_iso" => v.as_str().map(|a| {
let ts = Timestamp::from_str(a).unwrap();
Cell::Timestamp(ts)
}),
"json" => Some(Cell::Json(JsonB(v.clone()))),
_ => None,
});
row.push(col_name, cell);
}
}

// put all properties into 'attrs' JSON column
if tgt_cols.iter().any(|c| &c.name == "attrs") {
let attrs = serde_json::from_str(&obj.to_string()).unwrap();
row.push("attrs", Some(Cell::Json(JsonB(attrs))));
}

result.push(row);
}
result
}

// convert response body text to rows
fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> Vec<Row> {
fn resp_to_rows(obj: &str, resp: &JsonValue) -> Vec<Row> {
let mut result = Vec::new();

match obj {
"metric_labels" => {
result = body_to_rows(
resp,
"data",
vec![
("id", "metric_id", "i64"),
("metric_name", "metric_name", "string"),
("metric_name_label", "metric_name_label", "string"),
("metric_labels", "metric_labels", "json"),
],
tgt_cols,
);
}
"metric_values" => {
result = body_to_rows(
resp,
"data",
vec![
("id", "metric_id", "i64"),
("timestamp", "timestamp", "i64"),
("value", "value", "i64"),
],
tgt_cols,
);
"metrics" => {
if let Some(result_array) = resp["data"]["result"].as_array() {
for result_obj in result_array {
let metric_name = result_obj["metric"]["__name__"]
.as_str()
.unwrap_or_default()
.to_string();
let metric_labels = result_obj["metric"].clone();
if let Some(values_array) = result_obj["values"].as_array() {
for value_pair in values_array {
if let (Some(time_str), Some(value_str)) =
(value_pair[0].as_i64(), value_pair[1].as_str())
{
if let (metric_time, Ok(metric_value)) =
(time_str, value_str.parse::<f64>())
{
let mut row = Row::new();
row.push(
"metric_name",
Some(Cell::String(metric_name.clone())),
);
row.push(
"metric_labels",
Some(Cell::Json(JsonB(metric_labels.clone()))),
);
row.push("metric_time", Some(Cell::I64(metric_time)));
row.push("metric_value", Some(Cell::F64(metric_value)));
result.push(row);
}
}
}
}
}
}
}
_ => {
warning!("unsupported object: {}", obj);
Expand All @@ -141,22 +71,7 @@ pub(crate) struct PrometheusFdw {

impl PrometheusFdw {
const DEFAULT_BASE_URL: &'static str =
"https://prometheus-control-1.use1.dev.plat.cdb-svc.com/";

fn map_operator(op: &str) -> &str {
match op {
"=" => "=\"",
"!=" => "!=\"",
">" => ">\"",
"<" => "<\"",
">=" => ">=\"",
"<=" => "<=\"",
_ => {
println!("unsupported operator: {}", op);
"\""
}
}
}
"https://prometheus-control-1.use1.dev.plat.cdb-svc.com/api/v1/query";

fn value_to_promql_string(value: &supabase_wrappers::interface::Value) -> String {
match value {
Expand Down Expand Up @@ -215,28 +130,11 @@ impl PrometheusFdw {
}

fn build_url(&self, obj: &str, _options: &HashMap<String, String>, quals: &[Qual]) -> String {
let base_url = "https://prometheus-control-1.use1.dev.plat.cdb-svc.com/api/v1/query";

match obj {
"metric_labels" => {
// Find the metric_name filter from quals
"metrics" => {
let metric_name_filter = quals
.iter()
.find(|qual| qual.field == "metric_name" && qual.operator == "=");

// If a metric_name filter is found, build the query URL
if let Some(metric_name_qual) = metric_name_filter {
let metric_name = Self::value_to_promql_string(&metric_name_qual.value);
let ret = format!("{}?query={}", base_url, metric_name);
warning!("inside metric_labels");
warning!("Constructed URL: {}", ret);
ret
} else {
println!("No metric_name filter found in quals");
"".to_string()
}
}
"metric_values" => {
let lower_timestamp = quals
.iter()
.find(|qual| qual.field == "metric_time" && qual.operator == ">");
Expand All @@ -245,26 +143,27 @@ impl PrometheusFdw {
.iter()
.find(|qual| qual.field == "metric_time" && qual.operator == "<");

// If both lower and upper timestamp filters are found, build the query URL
if let (Some(lower_timestamp), Some(upper_timestamp)) =
(lower_timestamp, upper_timestamp)
if let (Some(metric_name), Some(lower_timestamp), Some(upper_timestamp)) =
(metric_name_filter, lower_timestamp, upper_timestamp)
{
let metric_name = Self::value_to_promql_string(&metric_name.value);
let lower_timestamp = Self::value_to_promql_string(&lower_timestamp.value);
let upper_timestamp = Self::value_to_promql_string(&upper_timestamp.value);
let ret = format!(
"{}_range?query=container_threads&start={}&end={}&step=1m",
base_url, lower_timestamp, upper_timestamp
"{}_range?query={}&start={}&end={}&step=10m",
Self::DEFAULT_BASE_URL,
metric_name,
lower_timestamp,
upper_timestamp
);
warning!("inside metric_values");
warning!("Constructed URL: {}", ret);
ret
} else {
println!("Timestamp filters not found in quals");
"".to_string()
}
}
_ => {
println!("unsupported object: {:#?}", obj);
println!("Unsupported object: {}", obj);
"".to_string()
}
}
Expand Down Expand Up @@ -304,14 +203,12 @@ impl ForeignDataWrapper for PrometheusFdw {

self.scan_result = None;
self.tgt_cols = columns.to_vec();
let api_key = "".to_string();

if let Some(client) = &self.client {
let mut result = Vec::new();

if obj == "metric_labels" || obj == "metric_values" {
if obj == "metrics" {
let url = self.build_url(&obj, options, quals);
warning!("url: {}", url);

let resp = self.rt.block_on(async { client.get(&url).send().await });

Expand All @@ -320,11 +217,8 @@ impl ForeignDataWrapper for PrometheusFdw {
let body = self.rt.block_on(async { resp.text().await });
match body {
Ok(body) => {
// warning!("body: {}", body);
let json: JsonValue = serde_json::from_str(&body).unwrap();
// warning!("json: {:#?}", json[0]);
result = resp_to_rows(&obj, &json, columns);
// warning!("result: {:#?}", result);
result = resp_to_rows(&obj, &json);
}
Err(e) => {
warning!("failed to get body: {}", e);
Expand Down

0 comments on commit a950852

Please sign in to comment.