Skip to content

Commit

Permalink
Merge pull request #4 from tembo-io/pagination
Browse files Browse the repository at this point in the history
Pagination
  • Loading branch information
Jayko001 authored Feb 8, 2024
2 parents b9f1c6f + 3144cc1 commit 7f90ac1
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "orb_fdw"
version = "0.0.2"
version = "0.0.3"
edition = "2021"

[lib]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This is a simple open-source data wrapper that bridges the gap between your Post

### Pre-requisistes

- have the v0.0.2 of `orb_fdw` extension enabled in your instance
- have the v0.0.3 of `orb_fdw` extension enabled in your instance

Create the foreign data wrapper:

Expand Down
2 changes: 1 addition & 1 deletion Trunk.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "Postgres Foreign Data Wrapper for withorb.com Backend API"
homepage = "https://github.com/tembo-io/orb_fdw"
documentation = "https://github.com/tembo-io/orb_fdw"
categories = ["connectors"]
version = "0.0.2"
version = "0.0.3"

[build]
postgres_version = "15"
Expand Down
Empty file added sql/orb_fdw--0.0.2--0.03.sql
Empty file.
89 changes: 57 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ fn body_to_rows(
}

#[wrappers_fdw(
version = "0.0.2",
author = "Jay",
website = "https://github.com/",
version = "0.0.3",
author = "Jay Kothari",
website = "https://github.com/orb_fdw",
error_type = "OrbFdwError"
)]
pub(crate) struct OrbFdw {
Expand All @@ -175,24 +175,36 @@ impl OrbFdw {
// TODO: will have to incorporate offset at some point
const PAGE_SIZE: usize = 500;

fn build_url(&self, obj: &str, _offset: usize) -> String {
fn build_url(&self, obj: &str, cursor: Option<String>) -> String {
let base_url = Self::DEFAULT_BASE_URL.to_owned();
let cursor_param = if let Some(ref cur) = cursor {
format!("&cursor={}", cur)
} else {
String::new()
};

match obj {
"customers" => {
let ret = format!("{}/customers?limit={}", base_url, Self::PAGE_SIZE);
ret
}
"subscriptions" => {
let ret = format!("{}/subscriptions?limit={}", base_url, Self::PAGE_SIZE);
ret
}
"invoices" => {
let ret = format! {"{}/invoices?limit={}", base_url, Self::PAGE_SIZE};
ret
}
"customers" => format!(
"{}/customers?limit={}{}",
base_url,
Self::PAGE_SIZE,
cursor_param
),
"subscriptions" => format!(
"{}/subscriptions?limit={}{}",
base_url,
Self::PAGE_SIZE,
cursor_param
),
"invoices" => format!(
"{}/invoices?limit={}{}",
base_url,
Self::PAGE_SIZE,
cursor_param
),
_ => {
warning!("unsupported object: {:#?}", obj);
return "".to_string();
"".to_string()
}
}
}
Expand Down Expand Up @@ -251,23 +263,36 @@ impl ForeignDataWrapper for OrbFdw {

if let Some(client) = &self.client {
let mut result = Vec::new();
let mut cursor: Option<String> = None;

let url = self.build_url(&obj, 0);

let body = self
.rt
.block_on(client.get(&url).send())
.and_then(|resp| {
resp.error_for_status()
.and_then(|resp| self.rt.block_on(resp.text()))
.map_err(reqwest_middleware::Error::from)
})
.unwrap();

let json: JsonValue = serde_json::from_str(&body).unwrap();
let mut rows = resp_to_rows(&obj, &json, columns);
result.append(&mut rows);
loop {
let url = self.build_url(&obj, cursor.clone()); // Ensure build_url handles None as initial cursor
let body = self
.rt
.block_on(client.get(&url).send())
.and_then(|resp| {
resp.error_for_status()
.and_then(|resp| self.rt.block_on(resp.text()))
.map_err(reqwest_middleware::Error::from)
})
.unwrap();

let json: serde_json::Value = serde_json::from_str(&body).unwrap();
let rows = resp_to_rows(&obj, &json, columns); // Assuming this function exists and works as intended
if rows.is_empty() {
break;
}
result.append(&mut rows.clone());
cursor = json
.get("pagination_metadata")
.and_then(|pm| pm.get("next_cursor"))
.and_then(|nc| nc.as_str())
.map(String::from);
// Break if there is no next cursor
if cursor.is_none() {
break;
}
}
self.scan_result = Some(result);
}
}
Expand Down

0 comments on commit 7f90ac1

Please sign in to comment.