diff --git a/.github/workflows/extension_ci.yml b/.github/workflows/extension_ci.yml index f9b23c6..a4da7f1 100644 --- a/.github/workflows/extension_ci.yml +++ b/.github/workflows/extension_ci.yml @@ -81,6 +81,9 @@ jobs: if: github.event_name == 'release' name: trunk publish runs-on: ubuntu-22.04 + strategy: + matrix: + pg-version: [14, 15, 16] steps: - uses: actions/checkout@v2 - name: Install Rust stable toolchain @@ -103,7 +106,7 @@ jobs: cargo install pg-trunk - name: trunk build working-directory: ./ - run: trunk build --pg-version 15 + run: trunk build --pg-version ${{ matrix.pg-version }} - name: trunk publish working-directory: ./ env: diff --git a/Cargo.toml b/Cargo.toml index b105e59..90bbc01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,37 +1,27 @@ [package] name = "clerk_fdw" -version = "0.2.8" +version = "0.3.0" edition = "2021" +publish = false [lib] crate-type = ["cdylib"] [features] -default = ["pg15"] -pg11 = ["pgrx/pg11", "pgrx-tests/pg11"] -pg12 = ["pgrx/pg12", "pgrx-tests/pg12"] -pg13 = ["pgrx/pg13", "pgrx-tests/pg13"] -pg14 = ["pgrx/pg14", "pgrx-tests/pg14"] -pg15 = ["pgrx/pg15", "pgrx-tests/pg15"] +default = ["pg16"] +pg14 = ["pgrx/pg14", "pgrx-tests/pg14", "supabase-wrappers/pg14"] +pg15 = ["pgrx/pg15", "pgrx-tests/pg15", "supabase-wrappers/pg15"] +pg16 = ["pgrx/pg16", "pgrx-tests/pg16", "supabase-wrappers/pg16"] pg_test = [] [dependencies] chrono = "0.4.26" clerk-rs = "0.3.0" -pgrx = "=0.9.7" +pgrx = "=0.11.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -supabase-wrappers = "=0.1.15" +supabase-wrappers = { git = "https://github.com/supabase/wrappers.git", default-features = false} tokio = { version = "1", features = ["full"] } [dev-dependencies] -pgrx-tests = "=0.9.7" - -[profile.dev] -panic = "unwind" - -[profile.release] -panic = "unwind" -opt-level = 3 -lto = "fat" -codegen-units = 1 +pgrx-tests = "=0.11.3" \ No newline at end of file diff --git a/Makefile b/Makefile index 37bf7d7..e8a8f52 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -PGRX_POSTGRES ?= pg15 +PGRX_POSTGRES ?= pg16 DISTNAME = $(shell grep -m 1 '^name' Trunk.toml | sed -e 's/[^"]*"\([^"]*\)",\{0,1\}/\1/') DISTVERSION = $(shell grep -m 1 '^version' Trunk.toml | sed -e 's/[^"]*"\([^"]*\)",\{0,1\}/\1/') diff --git a/Trunk.toml b/Trunk.toml index d53556a..7799a6c 100644 --- a/Trunk.toml +++ b/Trunk.toml @@ -6,7 +6,7 @@ description = "Postgres Foreign Data Wrapper for Clerk.com Backend API" homepage = "https://github.com/tembo-io/clerk_fdw" documentation = "https://github.com/tembo-io/clerk_fdw" categories = ["connectors"] -version = "0.2.8" +version = "0.3.0" [build] postgres_version = "15" diff --git a/src/lib.rs b/src/lib.rs index dc7b680..71bab00 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,8 @@ +use pgrx::pg_sys::panic::ErrorReport; use pgrx::warning; +use pgrx::PgSqlErrorCode; use pgrx::{pg_sys, prelude::*, JsonB}; + use std::collections::HashMap; use std::env; use tokio::runtime::Runtime; @@ -155,11 +158,11 @@ fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> Vec { } #[wrappers_fdw( - version = "0.2.8", - author = "Jay Kothari", - website = "https://tembo.io" + version = "0.3.0", + author = "Tembo.io", + website = "https://tembo.io", + error_type = "ClerkFdwError" )] - pub(crate) struct ClerkFdw { rt: Runtime, scan_result: Option>, @@ -167,8 +170,18 @@ pub(crate) struct ClerkFdw { clerk_client: Clerk, } -impl ForeignDataWrapper for ClerkFdw { - fn new(options: &HashMap) -> Self { +enum ClerkFdwError {} + +impl From for ErrorReport { + fn from(_value: ClerkFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "") + } +} + +type ClerkFdwResult = Result; + +impl ForeignDataWrapper for ClerkFdw { + fn new(options: &HashMap) -> ClerkFdwResult { let token = if let Some(access_token) = options.get("api_key") { access_token.to_owned() } else { @@ -182,13 +195,13 @@ impl ForeignDataWrapper for ClerkFdw { Some(token.to_string()), None, )); - - Self { - rt: create_async_runtime(), + let rt = create_async_runtime().expect("failed to create async runtime"); + Ok(Self { + rt, tgt_cols: Vec::new(), scan_result: None, clerk_client, - } + }) } fn begin_scan( @@ -198,17 +211,14 @@ impl ForeignDataWrapper for ClerkFdw { _sorts: &[Sort], _limit: &Option, options: &HashMap, - ) { - let obj = match require_option("object", options) { - Some(obj) => obj, - None => return, - }; + ) -> ClerkFdwResult<()> { + let obj = require_option("object", options).expect("invalid option"); self.scan_result = None; self.tgt_cols = columns.to_vec(); let mut result = Vec::new(); - self.rt.block_on(async { + let run = self.rt.block_on(async { if obj == "organization_memberships" { // Get all organizations first let mut offset: f32 = 0.0; @@ -221,8 +231,10 @@ impl ForeignDataWrapper for ClerkFdw { None, ) .await; - if let Ok(org_res) = org_resp { + let total_orgs = org_res.data.len(); + info!("clerk_fdw: received total organizations: {}", total_orgs); + let mut i_org = 0; for org in org_res.data.iter() { let membership_resp = OrganizationMembership::list_organization_memberships( @@ -235,8 +247,12 @@ impl ForeignDataWrapper for ClerkFdw { match membership_resp { Ok(mem_res) => { + i_org += 1; + if i_org % 50 == 0 { + info!("clerk_fdw: received memberships for organization: {}/{}", i_org, total_orgs); + } let serde_v = serde_json::to_value(mem_res).unwrap(); - let mut rows = resp_to_rows(&obj, &serde_v, &self.tgt_cols[..]); + let mut rows = resp_to_rows(obj, &serde_v, &self.tgt_cols[..]); result.append(&mut rows); } Err(e) => { @@ -252,9 +268,11 @@ impl ForeignDataWrapper for ClerkFdw { std::thread::sleep(std::time::Duration::from_millis(50)); } if org_res.data.len() < PAGE_SIZE { + info!("clerk_fdw: finished fetching all memberships, total={}", result.len()); break; } else { offset += PAGE_SIZE as f32; + info!("clerk_fdw: fetching more organizations, offset={}", offset); } } else { warning!("Failed to get organizations. error: {:#?}", org_resp); @@ -265,7 +283,7 @@ impl ForeignDataWrapper for ClerkFdw { let mut offset = 0; loop { let obj_js = - match obj.as_str() { + match obj { "users" => { match User::get_user_list( &self.clerk_client, @@ -312,11 +330,11 @@ impl ForeignDataWrapper for ClerkFdw { } _ => { warning!("unsupported object: {}", obj); - return; + return Err(()); } }; - let mut rows = resp_to_rows(&obj, &obj_js, &self.tgt_cols[..]); + let mut rows = resp_to_rows(obj, &obj_js, &self.tgt_cols[..]); if rows.len() < PAGE_SIZE { result.append(&mut rows); break; @@ -326,31 +344,37 @@ impl ForeignDataWrapper for ClerkFdw { } } } + Ok(()) }); + run.expect("failed to run async block"); self.scan_result = Some(result); + Ok(()) } - fn iter_scan(&mut self, row: &mut Row) -> Option<()> { + fn iter_scan(&mut self, row: &mut Row) -> ClerkFdwResult> { if let Some(ref mut result) = self.scan_result { if !result.is_empty() { - return result + let scanned = result .drain(0..1) .last() .map(|src_row| row.replace_with(src_row)); + return Ok(scanned); } } - None + Ok(None) } - fn end_scan(&mut self) { + fn end_scan(&mut self) -> ClerkFdwResult<()> { self.scan_result.take(); + Ok(()) } - fn validator(options: Vec>, catalog: Option) { + fn validator(options: Vec>, catalog: Option) -> ClerkFdwResult<()> { if let Some(oid) = catalog { if oid == FOREIGN_TABLE_RELATION_ID { - check_options_contain(&options, "object"); + check_options_contain(&options, "object").expect("missing object option"); } } + Ok(()) } }