From db18752775573c444f64f18d9e07d5fe5034fc0f Mon Sep 17 00:00:00 2001 From: Jay Kothari Date: Fri, 5 Jul 2024 13:40:08 +0530 Subject: [PATCH] Use orb-billing SDK (#11) * basic setup for using the sdk * customers table working * remove unused dependencies and functions * subscription table working * add invoice table * run cargo clippy -D warnings in CI test * fix type * bump version --- .github/workflows/extension_ci.yml | 4 +- Cargo.lock | 119 +------- Cargo.toml | 12 +- README.md | 18 +- Trunk.toml | 2 +- ...-0.11.3.sql => orb_fdw--0.0.4--0.11.3.sql} | 0 sql/orb_fdw--0.11.3--0.12.0.sql | 0 sql/orb_fdw--0.12.0--0.12.1.sql | 0 src/lib.rs | 280 +++++++++--------- 9 files changed, 160 insertions(+), 275 deletions(-) rename sql/{orb_fw--0.0.4--0.11.3.sql => orb_fdw--0.0.4--0.11.3.sql} (100%) create mode 100644 sql/orb_fdw--0.11.3--0.12.0.sql create mode 100644 sql/orb_fdw--0.12.0--0.12.1.sql diff --git a/.github/workflows/extension_ci.yml b/.github/workflows/extension_ci.yml index ca1b616..328fd47 100644 --- a/.github/workflows/extension_ci.yml +++ b/.github/workflows/extension_ci.yml @@ -40,7 +40,7 @@ jobs: - name: Cargo format run: cargo +nightly fmt --all --check - name: Clippy - run: cargo clippy + run: cargo clippy -- -Dwarnings # figure out how to test work # test: @@ -83,7 +83,7 @@ jobs: runs-on: ubuntu-22.04 strategy: matrix: - pg: [14, 15, 16] + pg: [ 14, 15, 16 ] steps: - uses: actions/checkout@v4 - name: Install Rust stable toolchain diff --git a/Cargo.lock b/Cargo.lock index 2f0f075..4def486 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -919,10 +919,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c" dependencies = [ "cfg-if", - "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", - "wasm-bindgen", ] [[package]] @@ -1200,18 +1198,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "instant" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" -dependencies = [ - "cfg-if", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "ipnet" version = "2.9.0" @@ -1373,9 +1359,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "mime_guess" -version = "2.0.4" +version = "2.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" dependencies = [ "mime", "unicase", @@ -1549,9 +1535,9 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "orb-billing" -version = "0.9.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd0cad922f916817f5b02158bf446258b3e185c272147c356b03a79383fa771b" +checksum = "500b1c4e0a8b0cc4c72da97853846c4c4fc4a384d49090605daea9c1cd0ec883" dependencies = [ "async-stream", "codes-iso-3166", @@ -1572,22 +1558,18 @@ dependencies = [ [[package]] name = "orb_fdw" -version = "0.12.0" +version = "0.12.1" dependencies = [ "futures", - "http", "orb-billing", "pgrx", "pgrx-tests", "reqwest", "reqwest-middleware", - "reqwest-retry", - "serde", "serde_json", "supabase-wrappers", "thiserror", "tokio", - "url", ] [[package]] @@ -1607,17 +1589,6 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" -[[package]] -name = "parking_lot" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core 0.8.6", -] - [[package]] name = "parking_lot" version = "0.12.2" @@ -1625,21 +1596,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" dependencies = [ "lock_api", - "parking_lot_core 0.9.10", -] - -[[package]] -name = "parking_lot_core" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" -dependencies = [ - "cfg-if", - "instant", - "libc", - "redox_syscall 0.2.16", - "smallvec", - "winapi", + "parking_lot_core", ] [[package]] @@ -2239,15 +2196,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "redox_syscall" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "redox_syscall" version = "0.4.1" @@ -2362,40 +2310,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "reqwest-retry" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6a11c05102e5bec712c0619b8c7b7eda8b21a558a0bd981ceee15c38df8be4" -dependencies = [ - "anyhow", - "async-trait", - "chrono", - "futures", - "getrandom 0.2.14", - "http", - "hyper", - "parking_lot 0.11.2", - "reqwest", - "reqwest-middleware", - "retry-policies", - "task-local-extensions", - "tokio", - "tracing", - "wasm-timer", -] - -[[package]] -name = "retry-policies" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e09bbcb5003282bcb688f0bae741b278e9c7e8f378f561522c9806c58e075d9b" -dependencies = [ - "anyhow", - "chrono", - "rand 0.8.5", -] - [[package]] name = "rustc-demangle" version = "0.1.23" @@ -2783,7 +2697,7 @@ checksum = "f91138e76242f575eb1d3b38b4f1362f10d3a43f47d182a5b359af488a02293b" dependencies = [ "new_debug_unreachable", "once_cell", - "parking_lot 0.12.2", + "parking_lot", "phf_shared 0.10.0", "precomputed-hash", "serde", @@ -3047,7 +2961,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot 0.12.2", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", @@ -3089,7 +3003,7 @@ dependencies = [ "futures-channel", "futures-util", "log", - "parking_lot 0.12.2", + "parking_lot", "percent-encoding", "phf 0.11.2", "pin-project-lite", @@ -3460,21 +3374,6 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" -[[package]] -name = "wasm-timer" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be0ecb0db480561e9a7642b5d3e4187c128914e58aa84330b9493e3eb68c5e7f" -dependencies = [ - "futures", - "js-sys", - "parking_lot 0.11.2", - "pin-utils", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - [[package]] name = "web-sys" version = "0.3.69" diff --git a/Cargo.toml b/Cargo.toml index d3c0aee..e4b258f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "orb_fdw" -version = "0.12.0" +version = "0.12.1" edition = "2021" publish = false @@ -15,20 +15,16 @@ pg16 = ["pgrx/pg16", "pgrx-tests/pg16", "supabase-wrappers/pg16"] pg_test = [] [dependencies] -reqwest-retry = "0.2.2" pgrx = "=0.11.3" -orb-billing = "0.9.0" +orb-billing = "0.11.0" # supabase-wrappers = "=0.1.18" supabase-wrappers = { git = "https://github.com/supabase/wrappers.git", default-features = false, rev = "99242d70eb7e551b700e2db31cc618850d835bc2" } tokio = { version = "1", features = ["full"] } -serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -futures = "0.3.28" thiserror = "1.0.48" -http = "0.2.4" -reqwest = { version = "0.11.4", features = ["json"] } +futures = "0.3.28" reqwest-middleware = "0.2.4" -url = "2.2.2" +reqwest = { version = "0.11.4", features = ["json"] } [dev-dependencies] pgrx-tests = "=0.11.3" diff --git a/README.md b/README.md index fa4e659..457765a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ ## Orb_fdw -This is a simple open-source data wrapper that bridges the gap between your Postgres database and [Orb](https://www.withorb.com/) a leading usage-based billing solution. +This is a simple open-source data wrapper that bridges the gap between your Postgres database +and [Orb](https://www.withorb.com/) a leading usage-based billing solution. [![Tembo Cloud Try Free](https://tembo.io/tryFreeButton.svg)](https://cloud.tembo.io/sign-up) @@ -9,7 +10,7 @@ This is a simple open-source data wrapper that bridges the gap between your Post ### Pre-requisistes -- have the v0.12.0 of `orb_fdw` extension enabled in your instance +- have the v0.12.1 of `orb_fdw` extension enabled in your instance Create the foreign data wrapper: @@ -41,7 +42,8 @@ create foreign table orb_customers ( first_name text, email text, stripe_id text, - created_at text + created_at timestamp, + attrs jsonb ) server my_orb_server options ( @@ -59,8 +61,9 @@ create foreign table orb_subscriptions ( organization_id text, status text, plan text, - started_date text, - end_date text + started_date timestamp, + end_date timestamp, + attrs jsonb ) server my_orb_server options ( @@ -78,8 +81,9 @@ create foreign table orb_invoices ( subscription_id text, organization_id text, status text, - due_date text, - amount text + due_date timestamp, + amount text, + attrs jsonb ) server my_orb_server options ( diff --git a/Trunk.toml b/Trunk.toml index e96b869..0883d20 100644 --- a/Trunk.toml +++ b/Trunk.toml @@ -6,7 +6,7 @@ description = "Postgres Foreign Data Wrapper for withorb.com Backend API" homepage = "https://github.com/tembo-io/orb_fdw" documentation = "https://github.com/tembo-io/orb_fdw" categories = ["connectors"] -version = "0.12.0" +version = "0.12.1" [build] postgres_version = "15" diff --git a/sql/orb_fw--0.0.4--0.11.3.sql b/sql/orb_fdw--0.0.4--0.11.3.sql similarity index 100% rename from sql/orb_fw--0.0.4--0.11.3.sql rename to sql/orb_fdw--0.0.4--0.11.3.sql diff --git a/sql/orb_fdw--0.11.3--0.12.0.sql b/sql/orb_fdw--0.11.3--0.12.0.sql new file mode 100644 index 0000000..e69de29 diff --git a/sql/orb_fdw--0.12.0--0.12.1.sql b/sql/orb_fdw--0.12.0--0.12.1.sql new file mode 100644 index 0000000..e69de29 diff --git a/src/lib.rs b/src/lib.rs index 325727c..6748618 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,69 +9,70 @@ use tokio::runtime::Runtime; pg_module_magic!(); mod orb_fdw; use crate::orb_fdw::OrbFdwError; -use reqwest::{self, header}; -use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; -use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; +use futures::StreamExt; +use orb_billing::{ + Client as OrbClient, ClientConfig as OrbClientConfig, Customer as OrbCustomer, + Invoice as OrbInvoice, InvoiceListParams, ListParams, Subscription as OrbSubscription, + SubscriptionListParams, +}; +// TODO: Remove all unwraps. Handle the errors fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> Vec { - let mut result = Vec::new(); - match obj { "customers" => { - result = body_to_rows( + body_to_rows( resp, - "data", + "data", // This might need to be an empty string if resp is directly an array vec![ ("id", "user_id", "string"), ("external_customer_id", "organization_id", "string"), ("name", "first_name", "string"), ("email", "email", "string"), ("payment_provider_id", "stripe_id", "string"), - ("created_at", "created_at", "string"), - ], - tgt_cols, - ); - } - "subscriptions" => { - result = body_to_rows( - resp, - "data", - vec![ - ("id", "subscription_id", "string"), - ("customer.external_customer_id", "organization_id", "string"), - ("status", "status", "string"), - ("plan.external_plan_id", "plan", "string"), - ( - "current_billing_period_start_date", - "started_date", - "string", - ), - ("current_billing_period_end_date", "end_date", "string"), - ], - tgt_cols, - ); - } - "invoices" => { - result = body_to_rows( - resp, - "data", - vec![ - ("subscription.id", "subscription_id", "string"), - ("customer.id", "customer_id", "string"), - ("customer.external_customer_id", "organization_id", "string"), - ("status", "status", "string"), - ("due_date", "due_date", "string"), - ("amount_due", "amount", "string"), + ("created_at", "created_at", "timestamp_iso"), ], tgt_cols, - ); + ) } + "subscriptions" => body_to_rows( + resp, + "data", + vec![ + ("id", "subscription_id", "string"), + ("customer.external_customer_id", "organization_id", "string"), + ("status", "status", "string"), + ("plan.external_plan_id", "plan", "string"), + ( + "current_billing_period_start_date", + "started_date", + "timestamp_iso", + ), + ( + "current_billing_period_end_date", + "end_date", + "timestamp_iso", + ), + ], + tgt_cols, + ), + "invoices" => body_to_rows( + resp, + "data", + vec![ + ("subscription.id", "subscription_id", "string"), + ("customer.id", "customer_id", "string"), + ("customer.external_customer_id", "organization_id", "string"), + ("status", "status", "string"), + ("invoice_date", "due_date", "timestamp_iso"), + ("amount_due", "amount", "string"), + ], + tgt_cols, + ), _ => { warning!("unsupported object: {}", obj); + Vec::new() } } - - result } fn body_to_rows( @@ -111,16 +112,6 @@ fn body_to_rows( current_value = current_value.unwrap().as_object().unwrap().get(part); } - if *src_name == "email_addresses" { - current_value = current_value - .and_then(|v| v.as_array().and_then(|arr| arr.first())) - .and_then(|first_obj| { - first_obj - .as_object() - .and_then(|obj| obj.get("email_address")) - }); - } - let cell = current_value.and_then(|v| match *col_type { "bool" => v.as_bool().map(Cell::Bool), "i64" => v.as_i64().map(Cell::I64), @@ -153,57 +144,16 @@ fn body_to_rows( } #[wrappers_fdw( - version = "0.12.0", + version = "0.12.1", author = "Jay Kothari", website = "https://github.com/orb_fdw", error_type = "OrbFdwError" )] pub(crate) struct OrbFdw { rt: Runtime, - client: Option, + client: OrbClient, scan_result: Option>, -} - -impl OrbFdw { - // convert response body text to rows - const DEFAULT_BASE_URL: &'static str = "https://api.withorb.com/v1"; - - // TODO: will have to incorporate offset at some point - const PAGE_SIZE: usize = 500; - - fn build_url(&self, obj: &str, cursor: Option) -> String { - let base_url = Self::DEFAULT_BASE_URL.to_owned(); - let cursor_param = if let Some(ref cur) = cursor { - format!("&cursor={}", cur) - } else { - String::new() - }; - - match obj { - "customers" => format!( - "{}/customers?limit={}{}", - base_url, - Self::PAGE_SIZE, - cursor_param - ), - "subscriptions" => format!( - "{}/subscriptions?limit={}{}", - base_url, - Self::PAGE_SIZE, - cursor_param - ), - "invoices" => format!( - "{}/invoices?limit={}{}", - base_url, - Self::PAGE_SIZE, - cursor_param - ), - _ => { - warning!("unsupported object: {:#?}", obj); - "".to_string() - } - } - } + tgt_cols: Vec, } type OrbFdwResult = Result; @@ -215,28 +165,16 @@ impl ForeignDataWrapper for OrbFdw { warning!("Cannot find api_key in options"); env::var("ORB_API_KEY").unwrap() }; - - let mut headers = header::HeaderMap::new(); - let value = format!("Bearer {}", token); - let mut auth_value = header::HeaderValue::from_str(&value) - .map_err(|_| OrbFdwError::InvalidApiKeyHeader) - .unwrap(); - auth_value.set_sensitive(true); - headers.insert(header::AUTHORIZATION, auth_value); - let client = reqwest::Client::builder() - .default_headers(headers) - .build() - .unwrap(); - let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3); - let client = ClientBuilder::new(client) - .with(RetryTransientMiddleware::new_with_policy(retry_policy)) - .build(); - + let client_config = OrbClientConfig { + api_key: token.to_string(), + }; + let orb_client = OrbClient::new(client_config); let rt = create_async_runtime().expect("failed to create async runtime"); Ok(Self { rt, - client: Some(client), + client: orb_client, scan_result: None, + tgt_cols: Vec::new(), }) } @@ -250,41 +188,89 @@ impl ForeignDataWrapper for OrbFdw { ) -> OrbFdwResult<()> { let obj = require_option("object", options).expect("invalid option"); self.scan_result = None; + self.tgt_cols = columns.to_vec(); + let mut result = Vec::new(); - if let Some(client) = &self.client { - let mut result = Vec::new(); - let mut cursor: Option = None; + let run: Result<(), Box> = self.rt.block_on(async { + let obj_js = match obj { + "customers" => { + let customers_stream = self + .client + .list_customers(&ListParams::DEFAULT.page_size(400)); + let customers = customers_stream.collect::>().await; - loop { - let url = self.build_url(obj, cursor.clone()); // Ensure build_url handles None as initial cursor - let body = self - .rt - .block_on(client.get(&url).send()) - .and_then(|resp| { - resp.error_for_status() - .and_then(|resp| self.rt.block_on(resp.text())) - .map_err(reqwest_middleware::Error::from) - }) - .unwrap(); + // Process the Vec> + let processed_customers: Vec = customers + .into_iter() + .filter_map(|customer_result| match customer_result { + Ok(customer) => Some(customer), + Err(e) => { + warning!("Error processing customer: {}", e); + None + } + }) + .collect(); - let json: serde_json::Value = serde_json::from_str(&body).unwrap(); - let rows = resp_to_rows(obj, &json, columns); // Assuming this function exists and works as intended - if rows.is_empty() { - break; + info!("Found {} customers in Orb", processed_customers.len()); + serde_json::to_value(processed_customers).expect("failed deserializing users") } - result.append(&mut rows.clone()); - cursor = json - .get("pagination_metadata") - .and_then(|pm| pm.get("next_cursor")) - .and_then(|nc| nc.as_str()) - .map(String::from); - // Break if there is no next cursor - if cursor.is_none() { - break; + "subscriptions" => { + let subscriptions_stream = self + .client + .list_subscriptions(&SubscriptionListParams::DEFAULT.page_size(400)); + let subscriptions = subscriptions_stream.collect::>().await; + + let processed_subscriptions: Vec = subscriptions + .into_iter() + .filter_map(|customer_result| match customer_result { + Ok(customer) => Some(customer), + Err(e) => { + warning!("Error processing customer: {}", e); + None + } + }) + .collect(); + + info!( + "Found {} subscriptions in Orb", + processed_subscriptions.len() + ); + serde_json::to_value(processed_subscriptions) + .expect("failed deserializing users") } - } - self.scan_result = Some(result); - } + "invoices" => { + let invoices_stream = self + .client + .list_invoices(&InvoiceListParams::DEFAULT.page_size(400)); + let invoices = invoices_stream.collect::>().await; + + let processed_invoices: Vec = invoices + .into_iter() + .filter_map(|customer_result| match customer_result { + Ok(customer) => Some(customer), + Err(e) => { + warning!("Error processing customer: {}", e); + None + } + }) + .collect(); + + info!("Found {} subscriptions in Orb", processed_invoices.len()); + serde_json::to_value(processed_invoices).expect("failed deserializing users") + } + _ => { + warning!("unsupported object: {}", obj); + return Ok(()); + } + }; + + let mut rows = resp_to_rows(obj, &obj_js, &self.tgt_cols[..]); + result.append(&mut rows); + Ok(()) + }); + + run.expect("failed to run async block"); + self.scan_result = Some(result); Ok(()) }