From 79f0ca6a56e5bf21ec1279fabcb9a1374ff955c6 Mon Sep 17 00:00:00 2001 From: stefan-mysten <135084671+stefan-mysten@users.noreply.github.com> Date: Mon, 30 Sep 2024 18:10:00 -0700 Subject: [PATCH 1/3] Add more stream functions --- crates/sui-graphql-client/src/lib.rs | 172 +++++++++++++++++- .../src/query_types/events.rs | 2 +- .../src/query_types/object.rs | 8 +- .../src/query_types/transaction.rs | 2 +- 4 files changed, 170 insertions(+), 14 deletions(-) diff --git a/crates/sui-graphql-client/src/lib.rs b/crates/sui-graphql-client/src/lib.rs index 07cc610b6..1d3f65d4b 100644 --- a/crates/sui-graphql-client/src/lib.rs +++ b/crates/sui-graphql-client/src/lib.rs @@ -6,6 +6,7 @@ pub mod faucet; pub mod query_types; +use async_stream::try_stream; use base64ct::Encoding; use query_types::ActiveValidatorsArgs; use query_types::ActiveValidatorsQuery; @@ -322,8 +323,8 @@ impl Client { pub async fn coins( &self, owner: Address, - after: Option<&str>, - before: Option<&str>, + after: Option, + before: Option, first: Option, last: Option, coin_type: Option<&str>, @@ -359,16 +360,17 @@ impl Client { pub fn coins_stream<'a>( &'a self, owner: Address, - coin_type: Option<&'a str>, + coin_type: Option, ) -> Pin> + 'a>> { - Box::pin(async_stream::try_stream! { + let coin_type = coin_type.unwrap_or_else(|| "0x2::coin::Coin".to_string()); + Box::pin(try_stream! { let mut after = None; loop { let response = self.objects( - after.as_deref(), + after, None, Some(ObjectFilter { - type_: Some(coin_type.unwrap_or("0x2::coin::Coin")), + type_: Some(&coin_type), owner: Some(owner), object_ids: None, object_keys: None, @@ -554,6 +556,35 @@ impl Client { } } + pub async fn events_stream<'a>( + &'a self, + after: Option, + before: Option, + filter: Option, + first: Option, + last: Option, + ) -> Pin> + 'a>> { + Box::pin(try_stream! { + let mut after = after; + loop { + let response = self.events(filter.clone(), after, before.clone(), first, last).await?; + if let Some(page) = response { + for event in page.data { + yield event; + } + + if let Some(end_cursor) = page.page_info.end_cursor { + after = Some(end_cursor); + } else { + break; + } + } else { + break; + } + } + }) + } + // =========================================================================== // Objects API // =========================================================================== @@ -615,8 +646,8 @@ impl Client { /// ``` pub async fn objects( &self, - after: Option<&str>, - before: Option<&str>, + after: Option, + before: Option, filter: Option>, first: Option, last: Option, @@ -659,6 +690,36 @@ impl Client { } } + /// Stream objects. + pub async fn objects_stream<'a>( + &'a self, + after: Option, + before: Option, + filter: Option>, + first: Option, + last: Option, + ) -> Pin> + 'a>> { + Box::pin(try_stream! { + let mut after = after; + loop { + let response = self.objects(after, before.clone(), filter.clone(), first, last).await?; + if let Some(page) = response { + for object in page.data { + yield object; + } + + if let Some(end_cursor) = page.page_info.end_cursor { + after = Some(end_cursor); + } else { + break; + } + } else { + break; + } + } + }) + } + /// Return the object's bcs content [`Vec`] based on the provided [`Address`]. pub async fn object_bcs(&self, object_id: Address) -> Result>, Error> { let operation = ObjectQuery::build(ObjectQueryArgs { @@ -789,12 +850,43 @@ impl Client { Ok(None) } } + + /// Stream of transactions based on the provided filters. + pub async fn transactions_stream<'a>( + &'a self, + after: Option, + before: Option, + first: Option, + last: Option, + filter: Option, + ) -> Pin> + 'a>> { + Box::pin(try_stream! { + let mut after = after; + loop { + let response = self.transactions(after, before.clone(), first, last, filter.clone()).await?; + if let Some(page) = response { + for tx in page.data { + yield tx; + } + + if let Some(end_cursor) = page.page_info.end_cursor { + after = Some(end_cursor); + } else { + break; + } + } else { + break; + } + } + }) + } } #[cfg(test)] mod tests { use futures::StreamExt; + use crate::query_types::{EventFilter, ObjectFilter, TransactionsFilter}; use crate::Client; use crate::DEVNET_HOST; use crate::LOCAL_HOST; @@ -1072,4 +1164,68 @@ mod tests { ); } } + + // TODO remove ignore after PR #20 is merged + #[tokio::test] + #[ignore] + async fn test_events_stream() { + let client = Client::new_testnet(); + let ef = EventFilter { + emitting_module: None, + event_type: None, + sender: None, + transaction_digest: None, + }; + + let mut stream = client + .events_stream(None, None, Some(ef), None, Some(10)) + .await; + + while let Some(result) = stream.next().await { + assert!(result.is_ok()) + } + } + + #[tokio::test] + async fn test_objects_stream() { + let client = Client::new_testnet(); + let obj_filter = ObjectFilter { + type_: Some("0x2::sui::SUI"), + owner: None, + object_ids: None, + object_keys: None, + }; + + let mut stream = client + .objects_stream(None, None, Some(obj_filter), None, Some(10)) + .await; + + while let Some(result) = stream.next().await { + assert!(result.is_ok()) + } + } + + // TODO: remove the ignore after we fix tx bcs stuff + #[tokio::test] + #[ignore] + async fn test_transactions_stream() { + let client = Client::new_testnet(); + let tx_filter = TransactionsFilter { + function: None, + kind: None, + at_checkpoint: None, + before_checkpoint: None, + changed_object: None, + input_object: None, + recv_address: None, + }; + + let mut stream = client + .transactions_stream(None, None, None, Some(10), Some(tx_filter)) + .await; + + while let Some(result) = stream.next().await { + assert!(result.is_ok()) + } + } } diff --git a/crates/sui-graphql-client/src/query_types/events.rs b/crates/sui-graphql-client/src/query_types/events.rs index 4bb70b6f6..3006b9b9a 100644 --- a/crates/sui-graphql-client/src/query_types/events.rs +++ b/crates/sui-graphql-client/src/query_types/events.rs @@ -47,7 +47,7 @@ pub struct EventConnection { pub nodes: Vec, } -#[derive(cynic::InputObject, Debug)] +#[derive(cynic::InputObject, Debug, Clone)] #[cynic(schema = "rpc", graphql_type = "EventFilter")] pub struct EventFilter { pub emitting_module: Option, diff --git a/crates/sui-graphql-client/src/query_types/object.rs b/crates/sui-graphql-client/src/query_types/object.rs index 6f8b99559..c7952738c 100644 --- a/crates/sui-graphql-client/src/query_types/object.rs +++ b/crates/sui-graphql-client/src/query_types/object.rs @@ -37,8 +37,8 @@ pub struct ObjectQueryArgs { #[derive(cynic::QueryVariables, Debug)] pub struct ObjectsQueryArgs<'a> { - pub after: Option<&'a str>, - pub before: Option<&'a str>, + pub after: Option, + pub before: Option, pub filter: Option>, pub first: Option, pub last: Option, @@ -54,7 +54,7 @@ pub struct Object { pub bcs: Option, } -#[derive(cynic::InputObject, Debug)] +#[derive(cynic::InputObject, Debug, Clone)] #[cynic(schema = "rpc", graphql_type = "ObjectFilter")] pub struct ObjectFilter<'a> { #[cynic(rename = "type")] @@ -64,7 +64,7 @@ pub struct ObjectFilter<'a> { pub object_keys: Option>, } -#[derive(cynic::InputObject, Debug)] +#[derive(cynic::InputObject, Debug, Clone)] #[cynic(schema = "rpc", graphql_type = "ObjectKey")] pub struct ObjectKey { pub object_id: Address, diff --git a/crates/sui-graphql-client/src/query_types/transaction.rs b/crates/sui-graphql-client/src/query_types/transaction.rs index 2e71d253d..66d4cbdae 100644 --- a/crates/sui-graphql-client/src/query_types/transaction.rs +++ b/crates/sui-graphql-client/src/query_types/transaction.rs @@ -72,7 +72,7 @@ pub enum TransactionBlockKindInput { ProgrammableTx, } -#[derive(cynic::InputObject, Debug)] +#[derive(cynic::InputObject, Debug, Clone)] #[cynic(schema = "rpc", graphql_type = "TransactionBlockFilter")] pub struct TransactionsFilter { pub function: Option, From a35045b5ce873c2c115fcdddfea99b887383c10e Mon Sep 17 00:00:00 2001 From: stefan-mysten <135084671+stefan-mysten@users.noreply.github.com> Date: Tue, 1 Oct 2024 18:24:59 -0700 Subject: [PATCH 2/3] Fix fmt --- crates/sui-graphql-client/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/sui-graphql-client/src/lib.rs b/crates/sui-graphql-client/src/lib.rs index 1d3f65d4b..7072f9ca0 100644 --- a/crates/sui-graphql-client/src/lib.rs +++ b/crates/sui-graphql-client/src/lib.rs @@ -886,7 +886,9 @@ impl Client { mod tests { use futures::StreamExt; - use crate::query_types::{EventFilter, ObjectFilter, TransactionsFilter}; + use crate::query_types::EventFilter; + use crate::query_types::ObjectFilter; + use crate::query_types::TransactionsFilter; use crate::Client; use crate::DEVNET_HOST; use crate::LOCAL_HOST; From 5de3cc36866d541ee3309fc618e68b64ff6e6d8b Mon Sep 17 00:00:00 2001 From: stefan-mysten <135084671+stefan-mysten@users.noreply.github.com> Date: Thu, 3 Oct 2024 22:35:41 -0700 Subject: [PATCH 3/3] Refactor to use a str reference as input for after and before filters --- crates/sui-graphql-client/src/lib.rs | 65 ++++++++++--------- .../src/query_types/events.rs | 6 +- .../src/query_types/object.rs | 4 +- .../src/query_types/transaction.rs | 6 +- 4 files changed, 41 insertions(+), 40 deletions(-) diff --git a/crates/sui-graphql-client/src/lib.rs b/crates/sui-graphql-client/src/lib.rs index 7072f9ca0..097a199d0 100644 --- a/crates/sui-graphql-client/src/lib.rs +++ b/crates/sui-graphql-client/src/lib.rs @@ -320,11 +320,11 @@ impl Client { /// /// If `coin_type` is not provided, it will default to `0x2::coin::Coin`, which will return all /// coins. For SUI coin, pass in the coin type: `0x2::coin::Coin<0x2::sui::SUI>`. - pub async fn coins( - &self, + pub async fn coins<'a>( + &'a self, owner: Address, - after: Option, - before: Option, + after: Option<&'a str>, + before: Option<&'a str>, first: Option, last: Option, coin_type: Option<&str>, @@ -360,17 +360,17 @@ impl Client { pub fn coins_stream<'a>( &'a self, owner: Address, - coin_type: Option, + coin_type: Option<&'a str>, ) -> Pin> + 'a>> { - let coin_type = coin_type.unwrap_or_else(|| "0x2::coin::Coin".to_string()); + let coin_type = coin_type.unwrap_or("0x2::coin::Coin"); + let mut after: Option = None; Box::pin(try_stream! { - let mut after = None; loop { let response = self.objects( - after, + after.as_deref(), None, Some(ObjectFilter { - type_: Some(&coin_type), + type_: Some(coin_type), owner: Some(owner), object_ids: None, object_keys: None, @@ -519,11 +519,11 @@ impl Client { // Events API // =========================================================================== - pub async fn events( - &self, + pub async fn events<'a>( + &'a self, filter: Option, - after: Option, - before: Option, + after: Option<&'a str>, + before: Option<&'a str>, first: Option, last: Option, ) -> Result>, Error> { @@ -558,16 +558,16 @@ impl Client { pub async fn events_stream<'a>( &'a self, - after: Option, - before: Option, + after: Option<&'a str>, + before: Option<&'a str>, filter: Option, first: Option, last: Option, ) -> Pin> + 'a>> { + let mut after = after.map(|s| s.to_string()); Box::pin(try_stream! { - let mut after = after; loop { - let response = self.events(filter.clone(), after, before.clone(), first, last).await?; + let response = self.events(filter.clone(), after.as_deref(), before, first, last).await?; if let Some(page) = response { for event in page.data { yield event; @@ -644,10 +644,10 @@ impl Client { /// /// let owned_objects = client.objects(None, None, Some(filter), None, None).await; /// ``` - pub async fn objects( - &self, - after: Option, - before: Option, + pub async fn objects<'a>( + &'a self, + after: Option<&'a str>, + before: Option<&'a str>, filter: Option>, first: Option, last: Option, @@ -693,16 +693,17 @@ impl Client { /// Stream objects. pub async fn objects_stream<'a>( &'a self, - after: Option, - before: Option, + after: Option<&'a str>, + before: Option<&'a str>, filter: Option>, first: Option, last: Option, ) -> Pin> + 'a>> { + let after = after.map(|s| s.to_string()); Box::pin(try_stream! { let mut after = after; loop { - let response = self.objects(after, before.clone(), filter.clone(), first, last).await?; + let response = self.objects(after.as_deref(), before, filter.clone(), first, last).await?; if let Some(page) = response { for object in page.data { yield object; @@ -769,10 +770,10 @@ impl Client { } /// Get a page of transactions based on the provided filters. - pub async fn transactions( - &self, - after: Option, - before: Option, + pub async fn transactions<'a>( + &'a self, + after: Option<&'a str>, + before: Option<&'a str>, first: Option, last: Option, filter: Option, @@ -854,16 +855,16 @@ impl Client { /// Stream of transactions based on the provided filters. pub async fn transactions_stream<'a>( &'a self, - after: Option, - before: Option, + after: Option<&'a str>, + before: Option<&'a str>, first: Option, last: Option, filter: Option, ) -> Pin> + 'a>> { + let mut after = after.map(|s| s.to_string()); Box::pin(try_stream! { - let mut after = after; loop { - let response = self.transactions(after, before.clone(), first, last, filter.clone()).await?; + let response = self.transactions(after.as_deref(), before, first, last, filter.clone()).await?; if let Some(page) = response { for tx in page.data { yield tx; diff --git a/crates/sui-graphql-client/src/query_types/events.rs b/crates/sui-graphql-client/src/query_types/events.rs index 3006b9b9a..15f13d7b1 100644 --- a/crates/sui-graphql-client/src/query_types/events.rs +++ b/crates/sui-graphql-client/src/query_types/events.rs @@ -28,10 +28,10 @@ pub struct EventsQuery { // =========================================================================== #[derive(cynic::QueryVariables, Debug)] -pub struct EventsQueryArgs { +pub struct EventsQueryArgs<'a> { pub filter: Option, - pub after: Option, - pub before: Option, + pub after: Option<&'a str>, + pub before: Option<&'a str>, pub first: Option, pub last: Option, } diff --git a/crates/sui-graphql-client/src/query_types/object.rs b/crates/sui-graphql-client/src/query_types/object.rs index c7952738c..cef8250c2 100644 --- a/crates/sui-graphql-client/src/query_types/object.rs +++ b/crates/sui-graphql-client/src/query_types/object.rs @@ -37,8 +37,8 @@ pub struct ObjectQueryArgs { #[derive(cynic::QueryVariables, Debug)] pub struct ObjectsQueryArgs<'a> { - pub after: Option, - pub before: Option, + pub after: Option<&'a str>, + pub before: Option<&'a str>, pub filter: Option>, pub first: Option, pub last: Option, diff --git a/crates/sui-graphql-client/src/query_types/transaction.rs b/crates/sui-graphql-client/src/query_types/transaction.rs index 66d4cbdae..841d89808 100644 --- a/crates/sui-graphql-client/src/query_types/transaction.rs +++ b/crates/sui-graphql-client/src/query_types/transaction.rs @@ -43,11 +43,11 @@ pub struct TransactionBlockArgs { } #[derive(cynic::QueryVariables, Debug)] -pub struct TransactionBlocksQueryArgs { +pub struct TransactionBlocksQueryArgs<'a> { pub first: Option, - pub after: Option, + pub after: Option<&'a str>, pub last: Option, - pub before: Option, + pub before: Option<&'a str>, pub filter: Option, }