From 411e8d0a0422c482ee3fb484de0c6d85eec0faf7 Mon Sep 17 00:00:00 2001 From: Jay Kothari Date: Thu, 8 Feb 2024 13:22:45 -0500 Subject: [PATCH 1/4] add pagination --- Cargo.lock | 2 +- src/lib.rs | 67 ++++++++++++++++++++++++++++++------------------------ 2 files changed, 38 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 781373f..d355b6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1545,7 +1545,7 @@ dependencies = [ [[package]] name = "orb_fdw" -version = "0.0.1" +version = "0.0.2" dependencies = [ "futures", "http", diff --git a/src/lib.rs b/src/lib.rs index b475798..5490248 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -173,26 +173,23 @@ impl OrbFdw { const DEFAULT_ROWS_LIMIT: usize = 10_000; // TODO: will have to incorporate offset at some point - const PAGE_SIZE: usize = 500; + const PAGE_SIZE: usize = 5; - fn build_url(&self, obj: &str, _offset: usize) -> String { + fn build_url(&self, obj: &str, cursor: Option) -> String { let base_url = Self::DEFAULT_BASE_URL.to_owned(); + let cursor_param = if let Some(ref cur) = cursor { + format!("&cursor={}", cur) + } else { + String::new() + }; + match obj { - "customers" => { - let ret = format!("{}/customers?limit={}", base_url, Self::PAGE_SIZE); - ret - } - "subscriptions" => { - let ret = format!("{}/subscriptions?limit={}", base_url, Self::PAGE_SIZE); - ret - } - "invoices" => { - let ret = format! {"{}/invoices?limit={}", base_url, Self::PAGE_SIZE}; - ret - } + "customers" => format!("{}/customers?limit={}{}", base_url, Self::PAGE_SIZE, cursor_param), + "subscriptions" => format!("{}/subscriptions?limit={}{}", base_url, Self::PAGE_SIZE, cursor_param), + "invoices" => format!("{}/invoices?limit={}{}", base_url, Self::PAGE_SIZE, cursor_param), _ => { warning!("unsupported object: {:#?}", obj); - return "".to_string(); + "".to_string() } } } @@ -251,25 +248,35 @@ impl ForeignDataWrapper for OrbFdw { if let Some(client) = &self.client { let mut result = Vec::new(); + let mut cursor: Option = None; - let url = self.build_url(&obj, 0); - - let body = self - .rt - .block_on(client.get(&url).send()) - .and_then(|resp| { - resp.error_for_status() - .and_then(|resp| self.rt.block_on(resp.text())) - .map_err(reqwest_middleware::Error::from) - }) - .unwrap(); - - let json: JsonValue = serde_json::from_str(&body).unwrap(); - let mut rows = resp_to_rows(&obj, &json, columns); - result.append(&mut rows); + loop { + let url = self.build_url(&obj, cursor.clone()); // Ensure build_url handles None as initial cursor + let body = self + .rt + .block_on(client.get(&url).send()) + .and_then(|resp| { + resp.error_for_status() + .and_then(|resp| self.rt.block_on(resp.text())) + .map_err(reqwest_middleware::Error::from) + }) + .unwrap(); + let json: serde_json::Value = serde_json::from_str(&body).unwrap(); + let rows = resp_to_rows(&obj, &json, columns); // Assuming this function exists and works as intended + if rows.is_empty() { + break; + } + result.append(&mut rows.clone()); + cursor = json.get("pagination_metadata").and_then(|pm| pm.get("next_cursor")).and_then(|nc| nc.as_str()).map(String::from); + // Break if there is no next cursor + if cursor.is_none() { + break; + } + } self.scan_result = Some(result); } + } fn iter_scan(&mut self, row: &mut Row) -> Option<()> { From 19435e9c3ef96dd483a9e02f7fc42cb02a6114a3 Mon Sep 17 00:00:00 2001 From: Jay Kothari Date: Thu, 8 Feb 2024 13:24:50 -0500 Subject: [PATCH 2/4] add new version --- Cargo.toml | 2 +- README.md | 2 +- Trunk.toml | 2 +- sql/orb_fdw--0.0.2--0.03.sql | 0 src/lib.rs | 6 +++--- 5 files changed, 6 insertions(+), 6 deletions(-) create mode 100644 sql/orb_fdw--0.0.2--0.03.sql diff --git a/Cargo.toml b/Cargo.toml index 2eb06d2..14c8665 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "orb_fdw" -version = "0.0.2" +version = "0.0.3" edition = "2021" [lib] diff --git a/README.md b/README.md index 4558cf6..6c91f3b 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ This is a simple open-source data wrapper that bridges the gap between your Post ### Pre-requisistes -- have the v0.0.2 of `orb_fdw` extension enabled in your instance +- have the v0.0.3 of `orb_fdw` extension enabled in your instance Create the foreign data wrapper: diff --git a/Trunk.toml b/Trunk.toml index 0d06f8e..ebac124 100644 --- a/Trunk.toml +++ b/Trunk.toml @@ -6,7 +6,7 @@ description = "Postgres Foreign Data Wrapper for withorb.com Backend API" homepage = "https://github.com/tembo-io/orb_fdw" documentation = "https://github.com/tembo-io/orb_fdw" categories = ["connectors"] -version = "0.0.2" +version = "0.0.3" [build] postgres_version = "15" diff --git a/sql/orb_fdw--0.0.2--0.03.sql b/sql/orb_fdw--0.0.2--0.03.sql new file mode 100644 index 0000000..e69de29 diff --git a/src/lib.rs b/src/lib.rs index 5490248..d2a6db9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -153,9 +153,9 @@ fn body_to_rows( } #[wrappers_fdw( - version = "0.0.2", - author = "Jay", - website = "https://github.com/", + version = "0.0.3", + author = "Jay Kothari", + website = "https://github.com/orb_fdw", error_type = "OrbFdwError" )] pub(crate) struct OrbFdw { From 62b942193ab56ffee5b6253af2811f729289b086 Mon Sep 17 00:00:00 2001 From: Jay Kothari Date: Thu, 8 Feb 2024 13:25:58 -0500 Subject: [PATCH 3/4] fix page_size --- Cargo.lock | 2 +- src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d355b6b..f359c63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1545,7 +1545,7 @@ dependencies = [ [[package]] name = "orb_fdw" -version = "0.0.2" +version = "0.0.3" dependencies = [ "futures", "http", diff --git a/src/lib.rs b/src/lib.rs index d2a6db9..9164ef0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -173,7 +173,7 @@ impl OrbFdw { const DEFAULT_ROWS_LIMIT: usize = 10_000; // TODO: will have to incorporate offset at some point - const PAGE_SIZE: usize = 5; + const PAGE_SIZE: usize = 500; fn build_url(&self, obj: &str, cursor: Option) -> String { let base_url = Self::DEFAULT_BASE_URL.to_owned(); From 3144cc1b72c5a426e74fa7155422eb32929267e2 Mon Sep 17 00:00:00 2001 From: Jay Kothari Date: Thu, 8 Feb 2024 13:36:23 -0500 Subject: [PATCH 4/4] fix rust fmt --- src/lib.rs | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9164ef0..02ce3ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -184,9 +184,24 @@ impl OrbFdw { }; match obj { - "customers" => format!("{}/customers?limit={}{}", base_url, Self::PAGE_SIZE, cursor_param), - "subscriptions" => format!("{}/subscriptions?limit={}{}", base_url, Self::PAGE_SIZE, cursor_param), - "invoices" => format!("{}/invoices?limit={}{}", base_url, Self::PAGE_SIZE, cursor_param), + "customers" => format!( + "{}/customers?limit={}{}", + base_url, + Self::PAGE_SIZE, + cursor_param + ), + "subscriptions" => format!( + "{}/subscriptions?limit={}{}", + base_url, + Self::PAGE_SIZE, + cursor_param + ), + "invoices" => format!( + "{}/invoices?limit={}{}", + base_url, + Self::PAGE_SIZE, + cursor_param + ), _ => { warning!("unsupported object: {:#?}", obj); "".to_string() @@ -268,7 +283,11 @@ impl ForeignDataWrapper for OrbFdw { break; } result.append(&mut rows.clone()); - cursor = json.get("pagination_metadata").and_then(|pm| pm.get("next_cursor")).and_then(|nc| nc.as_str()).map(String::from); + cursor = json + .get("pagination_metadata") + .and_then(|pm| pm.get("next_cursor")) + .and_then(|nc| nc.as_str()) + .map(String::from); // Break if there is no next cursor if cursor.is_none() { break; @@ -276,7 +295,6 @@ impl ForeignDataWrapper for OrbFdw { } self.scan_result = Some(result); } - } fn iter_scan(&mut self, row: &mut Row) -> Option<()> {