Skip to content

Commit

Permalink
Merge branch 'main' into upgrade-test
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHend authored May 30, 2024
2 parents 540ee03 + 36049a5 commit e2f861d
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 49 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/extension_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ jobs:
if: github.event_name == 'release'
name: trunk publish
runs-on: ubuntu-22.04
strategy:
matrix:
pg-version: [14, 15, 16]
steps:
- uses: actions/checkout@v2
- name: Install Rust stable toolchain
Expand All @@ -103,7 +106,7 @@ jobs:
cargo install pg-trunk
- name: trunk build
working-directory: ./
run: trunk build --pg-version 15
run: trunk build --pg-version ${{ matrix.pg-version }}
- name: trunk publish
working-directory: ./
env:
Expand Down
28 changes: 9 additions & 19 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,37 +1,27 @@
[package]
name = "clerk_fdw"
version = "0.2.8"
version = "0.3.0"
edition = "2021"
publish = false

[lib]
crate-type = ["cdylib"]

[features]
default = ["pg15"]
pg11 = ["pgrx/pg11", "pgrx-tests/pg11"]
pg12 = ["pgrx/pg12", "pgrx-tests/pg12"]
pg13 = ["pgrx/pg13", "pgrx-tests/pg13"]
pg14 = ["pgrx/pg14", "pgrx-tests/pg14"]
pg15 = ["pgrx/pg15", "pgrx-tests/pg15"]
default = ["pg16"]
pg14 = ["pgrx/pg14", "pgrx-tests/pg14", "supabase-wrappers/pg14"]
pg15 = ["pgrx/pg15", "pgrx-tests/pg15", "supabase-wrappers/pg15"]
pg16 = ["pgrx/pg16", "pgrx-tests/pg16", "supabase-wrappers/pg16"]
pg_test = []

[dependencies]
chrono = "0.4.26"
clerk-rs = "0.3.0"
pgrx = "=0.9.7"
pgrx = "=0.11.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
supabase-wrappers = "=0.1.15"
supabase-wrappers = { git = "https://github.com/supabase/wrappers.git", default-features = false}
tokio = { version = "1", features = ["full"] }

[dev-dependencies]
pgrx-tests = "=0.9.7"

[profile.dev]
panic = "unwind"

[profile.release]
panic = "unwind"
opt-level = 3
lto = "fat"
codegen-units = 1
pgrx-tests = "=0.11.3"
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PGRX_POSTGRES ?= pg15
PGRX_POSTGRES ?= pg16
DISTNAME = $(shell grep -m 1 '^name' Trunk.toml | sed -e 's/[^"]*"\([^"]*\)",\{0,1\}/\1/')
DISTVERSION = $(shell grep -m 1 '^version' Trunk.toml | sed -e 's/[^"]*"\([^"]*\)",\{0,1\}/\1/')

Expand Down
2 changes: 1 addition & 1 deletion Trunk.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "Postgres Foreign Data Wrapper for Clerk.com Backend API"
homepage = "https://github.com/tembo-io/clerk_fdw"
documentation = "https://github.com/tembo-io/clerk_fdw"
categories = ["connectors"]
version = "0.2.8"
version = "0.3.0"

[build]
postgres_version = "15"
Expand Down
78 changes: 51 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::warning;
use pgrx::PgSqlErrorCode;
use pgrx::{pg_sys, prelude::*, JsonB};

use std::collections::HashMap;
use std::env;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -155,20 +158,30 @@ fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> Vec<Row> {
}

#[wrappers_fdw(
version = "0.2.8",
author = "Jay Kothari",
website = "https://tembo.io"
version = "0.3.0",
author = "Tembo.io",
website = "https://tembo.io",
error_type = "ClerkFdwError"
)]

pub(crate) struct ClerkFdw {
rt: Runtime,
scan_result: Option<Vec<Row>>,
tgt_cols: Vec<Column>,
clerk_client: Clerk,
}

impl ForeignDataWrapper for ClerkFdw {
fn new(options: &HashMap<String, String>) -> Self {
enum ClerkFdwError {}

impl From<ClerkFdwError> for ErrorReport {
fn from(_value: ClerkFdwError) -> Self {
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "")
}
}

type ClerkFdwResult<T> = Result<T, ClerkFdwError>;

impl ForeignDataWrapper<ClerkFdwError> for ClerkFdw {
fn new(options: &HashMap<String, String>) -> ClerkFdwResult<Self> {
let token = if let Some(access_token) = options.get("api_key") {
access_token.to_owned()
} else {
Expand All @@ -182,13 +195,13 @@ impl ForeignDataWrapper for ClerkFdw {
Some(token.to_string()),
None,
));

Self {
rt: create_async_runtime(),
let rt = create_async_runtime().expect("failed to create async runtime");
Ok(Self {
rt,
tgt_cols: Vec::new(),
scan_result: None,
clerk_client,
}
})
}

fn begin_scan(
Expand All @@ -198,17 +211,14 @@ impl ForeignDataWrapper for ClerkFdw {
_sorts: &[Sort],
_limit: &Option<Limit>,
options: &HashMap<String, String>,
) {
let obj = match require_option("object", options) {
Some(obj) => obj,
None => return,
};
) -> ClerkFdwResult<()> {
let obj = require_option("object", options).expect("invalid option");

self.scan_result = None;
self.tgt_cols = columns.to_vec();

let mut result = Vec::new();
self.rt.block_on(async {
let run = self.rt.block_on(async {
if obj == "organization_memberships" {
// Get all organizations first
let mut offset: f32 = 0.0;
Expand All @@ -221,8 +231,10 @@ impl ForeignDataWrapper for ClerkFdw {
None,
)
.await;

if let Ok(org_res) = org_resp {
let total_orgs = org_res.data.len();
info!("clerk_fdw: received total organizations: {}", total_orgs);
let mut i_org = 0;
for org in org_res.data.iter() {
let membership_resp =
OrganizationMembership::list_organization_memberships(
Expand All @@ -235,8 +247,12 @@ impl ForeignDataWrapper for ClerkFdw {

match membership_resp {
Ok(mem_res) => {
i_org += 1;
if i_org % 50 == 0 {
info!("clerk_fdw: received memberships for organization: {}/{}", i_org, total_orgs);
}
let serde_v = serde_json::to_value(mem_res).unwrap();
let mut rows = resp_to_rows(&obj, &serde_v, &self.tgt_cols[..]);
let mut rows = resp_to_rows(obj, &serde_v, &self.tgt_cols[..]);
result.append(&mut rows);
}
Err(e) => {
Expand All @@ -252,9 +268,11 @@ impl ForeignDataWrapper for ClerkFdw {
std::thread::sleep(std::time::Duration::from_millis(50));
}
if org_res.data.len() < PAGE_SIZE {
info!("clerk_fdw: finished fetching all memberships, total={}", result.len());
break;
} else {
offset += PAGE_SIZE as f32;
info!("clerk_fdw: fetching more organizations, offset={}", offset);
}
} else {
warning!("Failed to get organizations. error: {:#?}", org_resp);
Expand All @@ -265,7 +283,7 @@ impl ForeignDataWrapper for ClerkFdw {
let mut offset = 0;
loop {
let obj_js =
match obj.as_str() {
match obj {
"users" => {
match User::get_user_list(
&self.clerk_client,
Expand Down Expand Up @@ -312,11 +330,11 @@ impl ForeignDataWrapper for ClerkFdw {
}
_ => {
warning!("unsupported object: {}", obj);
return;
return Err(());
}
};

let mut rows = resp_to_rows(&obj, &obj_js, &self.tgt_cols[..]);
let mut rows = resp_to_rows(obj, &obj_js, &self.tgt_cols[..]);
if rows.len() < PAGE_SIZE {
result.append(&mut rows);
break;
Expand All @@ -326,31 +344,37 @@ impl ForeignDataWrapper for ClerkFdw {
}
}
}
Ok(())
});
run.expect("failed to run async block");
self.scan_result = Some(result);
Ok(())
}

fn iter_scan(&mut self, row: &mut Row) -> Option<()> {
fn iter_scan(&mut self, row: &mut Row) -> ClerkFdwResult<Option<()>> {
if let Some(ref mut result) = self.scan_result {
if !result.is_empty() {
return result
let scanned = result
.drain(0..1)
.last()
.map(|src_row| row.replace_with(src_row));
return Ok(scanned);
}
}
None
Ok(None)
}

fn end_scan(&mut self) {
fn end_scan(&mut self) -> ClerkFdwResult<()> {
self.scan_result.take();
Ok(())
}

fn validator(options: Vec<Option<String>>, catalog: Option<pg_sys::Oid>) {
fn validator(options: Vec<Option<String>>, catalog: Option<pg_sys::Oid>) -> ClerkFdwResult<()> {
if let Some(oid) = catalog {
if oid == FOREIGN_TABLE_RELATION_ID {
check_options_contain(&options, "object");
check_options_contain(&options, "object").expect("missing object option");
}
}
Ok(())
}
}

0 comments on commit e2f861d

Please sign in to comment.