Skip to content

Commit

Permalink
added junction table
Browse files Browse the repository at this point in the history
  • Loading branch information
Jayko001 committed Aug 31, 2023
1 parent 2dad6da commit 26cdc9a
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 57 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "clerk_fdw"
version = "0.2.1"
version = "0.2.2"
edition = "2021"

[lib]
Expand Down
180 changes: 124 additions & 56 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use pgrx::warning;
use pgrx::{pg_sys, prelude::*, JsonB};
use reqwest::{self, Client};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -42,25 +41,28 @@ fn body_to_rows(
if let Some((src_name, col_name, col_type)) =
normal_cols.iter().find(|(_, c, _)| c == &tgt_col.name)
{
let cell = obj
.as_object()
.and_then(|v| v.get(*src_name))
.and_then(|v| match *col_type {
"bool" => v.as_bool().map(Cell::Bool),
"i64" => v.as_i64().map(Cell::I64),
"string" => v.as_str().map(|a| Cell::String(a.to_owned())),
"timestamp" => v.as_str().map(|a| {
let secs = a.parse::<i64>().unwrap() / 1000;
let ts = to_timestamp(secs as f64);
Cell::Timestamp(ts.to_utc())
}),
"timestamp_iso" => v.as_str().map(|a| {
let ts = Timestamp::from_str(a).unwrap();
Cell::Timestamp(ts)
}),
"json" => Some(Cell::Json(JsonB(v.clone()))),
_ => None,
});
// Navigate through nested properties
let mut current_value: Option<&JsonValue> = Some(obj);
for part in src_name.split('.') {
current_value = current_value.unwrap().as_object().unwrap().get(part);
}

let cell = current_value.and_then(|v| match *col_type {
"bool" => v.as_bool().map(Cell::Bool),
"i64" => v.as_i64().map(Cell::I64),
"string" => v.as_str().map(|a| Cell::String(a.to_owned())),
"timestamp" => v.as_str().map(|a| {
let secs = a.parse::<i64>().unwrap() / 1000;
let ts = to_timestamp(secs as f64);
Cell::Timestamp(ts.to_utc())
}),
"timestamp_iso" => v.as_str().map(|a| {
let ts = Timestamp::from_str(a).unwrap();
Cell::Timestamp(ts)
}),
"json" => Some(Cell::Json(JsonB(v.clone()))),
_ => None,
});
row.push(col_name, cell);
}
}
Expand Down Expand Up @@ -118,11 +120,10 @@ fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> Vec<Row> {
"junction_table" => {
result = body_to_rows(
resp,
"junction_table",
"data",
vec![
("id", "id", "i64"),
("user_id", "user_id", "string"),
("organization_id", "organization_id", "string"),
("public_user_data.user_id", "user_id", "string"),
("organization.id", "organization_id", "string"),
("role", "role", "string"),
],
tgt_cols,
Expand All @@ -137,7 +138,7 @@ fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> Vec<Row> {
}

#[wrappers_fdw(
version = "0.2.1",
version = "0.2.2",
author = "Jay Kothari",
website = "https://tembo.io"
)]
Expand All @@ -158,9 +159,6 @@ impl ClerkFdw {
// TODO: will have to incorportate offset at some point
const PAGE_SIZE: usize = 500;

// default maximum row count limit
const DEFAULT_ROWS_LIMIT: usize = 10_000;

fn build_url(&self, obj: &str, options: &HashMap<String, String>) -> String {
match obj {
"users" => {
Expand All @@ -177,9 +175,15 @@ impl ClerkFdw {
warning!("junction_table is not supported");

let base_url = Self::DEFAULT_BASE_URL.to_owned();
// let org_id = 'org_2UiCBb861liwYn2jAfDHe7sDrW6';
// let ret = format!("{}/organizations/{}/memberships?limit={}", base_url, org_id, Self::PAGE_SIZE,);
let ret = format!("{}/organizations?limit={}", base_url, Self::PAGE_SIZE,);
let org_id = options
.get("organization_id")
.expect("Organization ID required");
let ret = format!(
"{}/organizations/{}/memberships?limit={}",
base_url,
org_id,
Self::PAGE_SIZE
);
ret
}
_ => {
Expand Down Expand Up @@ -237,33 +241,97 @@ impl ForeignDataWrapper for ClerkFdw {
if let Some(client) = &self.client {
let mut result = Vec::new();

let url = self.build_url(&obj, options);

// this is where i need to make changes
self.rt.block_on(async {
let resp = client
.get(&url)
.header("Authorization", format!("Bearer {}", api_key))
.send()
.await;

match resp {
Ok(mut res) => {
if res.status().is_success() {
let body = res.text().await.unwrap();
let json: JsonValue = serde_json::from_str(&body).unwrap();
let mut rows = resp_to_rows(&obj, &json, &self.tgt_cols[..]);
result.append(&mut rows);
} else {
warning!("Failed request with status: {}", res.status());
if obj == "junction_table" {
// Get all organizations first
let org_url = self.build_url("organizations", options);

self.rt.block_on(async {
let org_resp = client
.get(&org_url)
.header("Authorization", format!("Bearer {}", api_key))
.send()
.await;

if let Ok(mut org_res) = org_resp {
if org_res.status().is_success() {
let org_body = org_res.text().await.unwrap();
let org_json: JsonValue = serde_json::from_str(&org_body).unwrap();

if let Some(org_data) =
org_json.get("data").and_then(|data| data.as_array())
{
for org in org_data {
if let Some(org_id) = org.get("id").and_then(|id| id.as_str()) {
// Build the URL for memberships using org_id
let membership_url = format!(
"{}/organizations/{}/memberships?limit={}",
Self::DEFAULT_BASE_URL,
org_id,
Self::PAGE_SIZE
);

let membership_resp = client
.get(&membership_url)
.header("Authorization", format!("Bearer {}", api_key))
.send()
.await;

match membership_resp {
Ok(mem_res) => {
if mem_res.status().is_success() {
let mem_body = mem_res.text().await.unwrap();
let mem_json: JsonValue =
serde_json::from_str(&mem_body).unwrap();
// info!("mem_json: {:#?}", mem_json);

let mut rows = resp_to_rows(
&obj,
&mem_json,
&self.tgt_cols[..],
);
result.append(&mut rows);
}
}
Err(_) => continue,
};

// Introduce a delay of 0.5 seconds
std::thread::sleep(std::time::Duration::from_millis(500));
}
}
}
}
}
Err(error) => {
warning!("Error: {:#?}", error);
return;
}
};
});
});
} else {
let url = self.build_url(&obj, options);

// this is where i need to make changes
self.rt.block_on(async {
let resp = client
.get(&url)
.header("Authorization", format!("Bearer {}", api_key))
.send()
.await;

match resp {
Ok(mut res) => {
if res.status().is_success() {
let body = res.text().await.unwrap();
let json: JsonValue = serde_json::from_str(&body).unwrap();
let mut rows = resp_to_rows(&obj, &json, &self.tgt_cols[..]);
result.append(&mut rows);
} else {
warning!("Failed request with status: {}", res.status());
}
}
Err(error) => {
warning!("Error: {:#?}", error);
return;
}
};
});
}

self.scan_result = Some(result);
}
Expand Down

0 comments on commit 26cdc9a

Please sign in to comment.