Skip to content

Commit

Permalink
update function signatures
Browse files Browse the repository at this point in the history
  • Loading branch information
Jayko001 committed Jul 3, 2024
1 parent 344b0e0 commit 01e70dc
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 24 deletions.
12 changes: 5 additions & 7 deletions src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ fn basic_setup(base_url: &str) -> Result<(), Box<dyn Error>> {
base_url, "10m"
);

Spi::run(&queries);

let _ = Spi::run(&queries);
Ok(())
}

Expand All @@ -58,8 +57,7 @@ fn create_tables() -> Result<(), Box<dyn Error>> {
) PARTITION BY RANGE (time);
"#;

Spi::run(&queries);

let _ = Spi::run(&queries);
Ok(())
}

Expand All @@ -73,7 +71,7 @@ fn create_indexes() -> Result<(), Box<dyn Error>> {
CREATE INDEX idx_metric_values_label_id ON metric_values (label_id);
"#;

Spi::run(queries);
let _ = Spi::run(queries);
Ok(())
}

Expand All @@ -85,7 +83,7 @@ fn create_partitions(retention_period: &str) -> Result<(), Box<dyn Error>> {
"#;

// Execute the partition setup query
Spi::run(setup_partitioning);
let _ = Spi::run(setup_partitioning);

let setup_retention = format!(
r#"
Expand All @@ -100,6 +98,6 @@ fn create_partitions(retention_period: &str) -> Result<(), Box<dyn Error>> {
);

// Execute the retention setup query
Spi::run(&setup_retention);
let _ = Spi::run(&setup_retention);
Ok(())
}
47 changes: 30 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::runtime::Runtime;
pgrx::pg_module_magic!();
use std::time::Duration;
use urlencoding::encode;
use pgrx::pg_sys::panic::ErrorReport;

// convert response body text to rows
fn resp_to_rows(obj: &str, resp: &JsonValue, quals: &[Qual]) -> Vec<Row> {
Expand Down Expand Up @@ -66,7 +67,8 @@ fn resp_to_rows(obj: &str, resp: &JsonValue, quals: &[Qual]) -> Vec<Row> {
#[wrappers_fdw(
version = "0.1.5",
author = "Jay Kothari",
website = "https://tembo.io"
website = "https://tembo.io",
error_type = "PrometheusFdwError"
)]

pub(crate) struct PrometheusFdw {
Expand All @@ -80,6 +82,16 @@ pub(crate) struct PrometheusFdw {
tgt_cols: Vec<Column>,
}

enum PrometheusFdwError {}

impl From<PrometheusFdwError> for ErrorReport {
fn from(_value: PrometheusFdwError) -> Self {
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "")
}
}

type PrometheusFdwResult<T> = Result<T, PrometheusFdwError>;

impl PrometheusFdw {
fn value_to_promql_string(value: &supabase_wrappers::interface::Value) -> String {
match value {
Expand Down Expand Up @@ -147,10 +159,10 @@ impl PrometheusFdw {
}
}

impl ForeignDataWrapper for PrometheusFdw {
fn new(options: &HashMap<String, String>) -> Self {
impl ForeignDataWrapper<PrometheusFdwError> for PrometheusFdw {
fn new(options: &HashMap<String, String>) -> PrometheusFdwResult<Self> {
let mut ret = Self {
rt: create_async_runtime(),
rt: create_async_runtime().expect("failed to create async runtime"),
base_url: None,
username: None,
password: None,
Expand All @@ -176,7 +188,7 @@ impl ForeignDataWrapper for PrometheusFdw {
.expect("Failed to build client"),
);

ret
Ok(ret)
}

fn begin_scan(
Expand All @@ -186,11 +198,8 @@ impl ForeignDataWrapper for PrometheusFdw {
_sorts: &[Sort],
_limit: &Option<Limit>,
options: &HashMap<String, String>,
) {
let obj = match require_option("object", options) {
Some(obj) => obj,
None => return,
};
) -> PrometheusFdwResult<()> {
let obj = require_option("object", options).expect("invalid option");

self.scan_result = None;
self.tgt_cols = columns.to_vec();
Expand All @@ -200,7 +209,7 @@ impl ForeignDataWrapper for PrometheusFdw {

if obj == "metrics" {
let url = self.build_url(&obj, options, quals);
let mut resp = None;
let resp;

if let Some(bearer_token) = &self.bearer_token {
// Create a RequestBuilder and set the bearer token
Expand Down Expand Up @@ -242,28 +251,32 @@ impl ForeignDataWrapper for PrometheusFdw {

self.scan_result = Some(result);
}
Ok(())
}

fn iter_scan(&mut self, row: &mut Row) -> Option<()> {
fn iter_scan(&mut self, row: &mut Row) -> PrometheusFdwResult<Option<()>> {
if let Some(ref mut result) = self.scan_result {
if !result.is_empty() {
return result.drain(0..1).last().map(|src_row| {
let scanned = result.drain(0..1).last().map(|src_row| {
row.replace_with(src_row);
});
return Ok(scanned);
}
}
None
Ok(None)
}

fn end_scan(&mut self) {
fn end_scan(&mut self) -> PrometheusFdwResult<()> {
self.scan_result.take();
Ok(())
}

fn validator(options: Vec<Option<String>>, catalog: Option<pg_sys::Oid>) {
fn validator(options: Vec<Option<String>>, catalog: Option<pg_sys::Oid>) -> PrometheusFdwResult<()> {
if let Some(oid) = catalog {
if oid == FOREIGN_TABLE_RELATION_ID {
check_options_contain(&options, "object");
let _ =check_options_contain(&options, "object");
}
}
Ok(())
}
}

0 comments on commit 01e70dc

Please sign in to comment.