diff --git a/Cargo.lock b/Cargo.lock index 781373f..f359c63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1545,7 +1545,7 @@ dependencies = [ [[package]] name = "orb_fdw" -version = "0.0.1" +version = "0.0.3" dependencies = [ "futures", "http", diff --git a/Cargo.toml b/Cargo.toml index 2eb06d2..14c8665 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "orb_fdw" -version = "0.0.2" +version = "0.0.3" edition = "2021" [lib] diff --git a/README.md b/README.md index 4558cf6..6c91f3b 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ This is a simple open-source data wrapper that bridges the gap between your Post ### Pre-requisistes -- have the v0.0.2 of `orb_fdw` extension enabled in your instance +- have the v0.0.3 of `orb_fdw` extension enabled in your instance Create the foreign data wrapper: diff --git a/Trunk.toml b/Trunk.toml index 0d06f8e..ebac124 100644 --- a/Trunk.toml +++ b/Trunk.toml @@ -6,7 +6,7 @@ description = "Postgres Foreign Data Wrapper for withorb.com Backend API" homepage = "https://github.com/tembo-io/orb_fdw" documentation = "https://github.com/tembo-io/orb_fdw" categories = ["connectors"] -version = "0.0.2" +version = "0.0.3" [build] postgres_version = "15" diff --git a/sql/orb_fdw--0.0.2--0.03.sql b/sql/orb_fdw--0.0.2--0.03.sql new file mode 100644 index 0000000..e69de29 diff --git a/src/lib.rs b/src/lib.rs index b475798..02ce3ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -153,9 +153,9 @@ fn body_to_rows( } #[wrappers_fdw( - version = "0.0.2", - author = "Jay", - website = "https://github.com/", + version = "0.0.3", + author = "Jay Kothari", + website = "https://github.com/orb_fdw", error_type = "OrbFdwError" )] pub(crate) struct OrbFdw { @@ -175,24 +175,36 @@ impl OrbFdw { // TODO: will have to incorporate offset at some point const PAGE_SIZE: usize = 500; - fn build_url(&self, obj: &str, _offset: usize) -> String { + fn build_url(&self, obj: &str, cursor: Option) -> String { let base_url = Self::DEFAULT_BASE_URL.to_owned(); + let cursor_param = if let Some(ref cur) = cursor { + format!("&cursor={}", cur) + } else { + String::new() + }; + match obj { - "customers" => { - let ret = format!("{}/customers?limit={}", base_url, Self::PAGE_SIZE); - ret - } - "subscriptions" => { - let ret = format!("{}/subscriptions?limit={}", base_url, Self::PAGE_SIZE); - ret - } - "invoices" => { - let ret = format! {"{}/invoices?limit={}", base_url, Self::PAGE_SIZE}; - ret - } + "customers" => format!( + "{}/customers?limit={}{}", + base_url, + Self::PAGE_SIZE, + cursor_param + ), + "subscriptions" => format!( + "{}/subscriptions?limit={}{}", + base_url, + Self::PAGE_SIZE, + cursor_param + ), + "invoices" => format!( + "{}/invoices?limit={}{}", + base_url, + Self::PAGE_SIZE, + cursor_param + ), _ => { warning!("unsupported object: {:#?}", obj); - return "".to_string(); + "".to_string() } } } @@ -251,23 +263,36 @@ impl ForeignDataWrapper for OrbFdw { if let Some(client) = &self.client { let mut result = Vec::new(); + let mut cursor: Option = None; - let url = self.build_url(&obj, 0); - - let body = self - .rt - .block_on(client.get(&url).send()) - .and_then(|resp| { - resp.error_for_status() - .and_then(|resp| self.rt.block_on(resp.text())) - .map_err(reqwest_middleware::Error::from) - }) - .unwrap(); - - let json: JsonValue = serde_json::from_str(&body).unwrap(); - let mut rows = resp_to_rows(&obj, &json, columns); - result.append(&mut rows); + loop { + let url = self.build_url(&obj, cursor.clone()); // Ensure build_url handles None as initial cursor + let body = self + .rt + .block_on(client.get(&url).send()) + .and_then(|resp| { + resp.error_for_status() + .and_then(|resp| self.rt.block_on(resp.text())) + .map_err(reqwest_middleware::Error::from) + }) + .unwrap(); + let json: serde_json::Value = serde_json::from_str(&body).unwrap(); + let rows = resp_to_rows(&obj, &json, columns); // Assuming this function exists and works as intended + if rows.is_empty() { + break; + } + result.append(&mut rows.clone()); + cursor = json + .get("pagination_metadata") + .and_then(|pm| pm.get("next_cursor")) + .and_then(|nc| nc.as_str()) + .map(String::from); + // Break if there is no next cursor + if cursor.is_none() { + break; + } + } self.scan_result = Some(result); } }