Skip to content

Commit

Permalink
Add more stream functions
Browse files Browse the repository at this point in the history
  • Loading branch information
stefan-mysten committed Oct 2, 2024
1 parent 73d2e96 commit 79f0ca6
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 14 deletions.
172 changes: 164 additions & 8 deletions crates/sui-graphql-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
pub mod faucet;
pub mod query_types;

use async_stream::try_stream;
use base64ct::Encoding;
use query_types::ActiveValidatorsArgs;
use query_types::ActiveValidatorsQuery;
Expand Down Expand Up @@ -322,8 +323,8 @@ impl Client {
pub async fn coins(
&self,
owner: Address,
after: Option<&str>,
before: Option<&str>,
after: Option<String>,
before: Option<String>,
first: Option<i32>,
last: Option<i32>,
coin_type: Option<&str>,
Expand Down Expand Up @@ -359,16 +360,17 @@ impl Client {
pub fn coins_stream<'a>(
&'a self,
owner: Address,
coin_type: Option<&'a str>,
coin_type: Option<String>,
) -> Pin<Box<dyn Stream<Item = Result<Coin, Error>> + 'a>> {
Box::pin(async_stream::try_stream! {
let coin_type = coin_type.unwrap_or_else(|| "0x2::coin::Coin".to_string());
Box::pin(try_stream! {
let mut after = None;
loop {
let response = self.objects(
after.as_deref(),
after,
None,
Some(ObjectFilter {
type_: Some(coin_type.unwrap_or("0x2::coin::Coin")),
type_: Some(&coin_type),
owner: Some(owner),
object_ids: None,
object_keys: None,
Expand Down Expand Up @@ -554,6 +556,35 @@ impl Client {
}
}

pub async fn events_stream<'a>(
&'a self,
after: Option<String>,
before: Option<String>,
filter: Option<EventFilter>,
first: Option<i32>,
last: Option<i32>,
) -> Pin<Box<dyn Stream<Item = Result<Event, Error>> + 'a>> {
Box::pin(try_stream! {
let mut after = after;
loop {
let response = self.events(filter.clone(), after, before.clone(), first, last).await?;
if let Some(page) = response {
for event in page.data {
yield event;
}

if let Some(end_cursor) = page.page_info.end_cursor {
after = Some(end_cursor);
} else {
break;
}
} else {
break;
}
}
})
}

// ===========================================================================
// Objects API
// ===========================================================================
Expand Down Expand Up @@ -615,8 +646,8 @@ impl Client {
/// ```
pub async fn objects(
&self,
after: Option<&str>,
before: Option<&str>,
after: Option<String>,
before: Option<String>,
filter: Option<ObjectFilter<'_>>,
first: Option<i32>,
last: Option<i32>,
Expand Down Expand Up @@ -659,6 +690,36 @@ impl Client {
}
}

/// Stream objects.
pub async fn objects_stream<'a>(
&'a self,
after: Option<String>,
before: Option<String>,
filter: Option<ObjectFilter<'a>>,
first: Option<i32>,
last: Option<i32>,
) -> Pin<Box<dyn Stream<Item = Result<Object, Error>> + 'a>> {
Box::pin(try_stream! {
let mut after = after;
loop {
let response = self.objects(after, before.clone(), filter.clone(), first, last).await?;
if let Some(page) = response {
for object in page.data {
yield object;
}

if let Some(end_cursor) = page.page_info.end_cursor {
after = Some(end_cursor);
} else {
break;
}
} else {
break;
}
}
})
}

/// Return the object's bcs content [`Vec<u8>`] based on the provided [`Address`].
pub async fn object_bcs(&self, object_id: Address) -> Result<Option<Vec<u8>>, Error> {
let operation = ObjectQuery::build(ObjectQueryArgs {
Expand Down Expand Up @@ -789,12 +850,43 @@ impl Client {
Ok(None)
}
}

/// Stream of transactions based on the provided filters.
pub async fn transactions_stream<'a>(
&'a self,
after: Option<String>,
before: Option<String>,
first: Option<i32>,
last: Option<i32>,
filter: Option<TransactionsFilter>,
) -> Pin<Box<dyn Stream<Item = Result<SignedTransaction, Error>> + 'a>> {
Box::pin(try_stream! {
let mut after = after;
loop {
let response = self.transactions(after, before.clone(), first, last, filter.clone()).await?;
if let Some(page) = response {
for tx in page.data {
yield tx;
}

if let Some(end_cursor) = page.page_info.end_cursor {
after = Some(end_cursor);
} else {
break;
}
} else {
break;
}
}
})
}
}

#[cfg(test)]
mod tests {
use futures::StreamExt;

use crate::query_types::{EventFilter, ObjectFilter, TransactionsFilter};
use crate::Client;
use crate::DEVNET_HOST;
use crate::LOCAL_HOST;
Expand Down Expand Up @@ -1072,4 +1164,68 @@ mod tests {
);
}
}

// TODO remove ignore after PR #20 is merged
#[tokio::test]
#[ignore]
async fn test_events_stream() {
let client = Client::new_testnet();
let ef = EventFilter {
emitting_module: None,
event_type: None,
sender: None,
transaction_digest: None,
};

let mut stream = client
.events_stream(None, None, Some(ef), None, Some(10))
.await;

while let Some(result) = stream.next().await {
assert!(result.is_ok())
}
}

#[tokio::test]
async fn test_objects_stream() {
let client = Client::new_testnet();
let obj_filter = ObjectFilter {
type_: Some("0x2::sui::SUI"),
owner: None,
object_ids: None,
object_keys: None,
};

let mut stream = client
.objects_stream(None, None, Some(obj_filter), None, Some(10))
.await;

while let Some(result) = stream.next().await {
assert!(result.is_ok())
}
}

// TODO: remove the ignore after we fix tx bcs stuff
#[tokio::test]
#[ignore]
async fn test_transactions_stream() {
let client = Client::new_testnet();
let tx_filter = TransactionsFilter {
function: None,
kind: None,
at_checkpoint: None,
before_checkpoint: None,
changed_object: None,
input_object: None,
recv_address: None,
};

let mut stream = client
.transactions_stream(None, None, None, Some(10), Some(tx_filter))
.await;

while let Some(result) = stream.next().await {
assert!(result.is_ok())
}
}
}
2 changes: 1 addition & 1 deletion crates/sui-graphql-client/src/query_types/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct EventConnection {
pub nodes: Vec<Event>,
}

#[derive(cynic::InputObject, Debug)]
#[derive(cynic::InputObject, Debug, Clone)]
#[cynic(schema = "rpc", graphql_type = "EventFilter")]
pub struct EventFilter {
pub emitting_module: Option<String>,
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-graphql-client/src/query_types/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub struct ObjectQueryArgs {

#[derive(cynic::QueryVariables, Debug)]
pub struct ObjectsQueryArgs<'a> {
pub after: Option<&'a str>,
pub before: Option<&'a str>,
pub after: Option<String>,
pub before: Option<String>,
pub filter: Option<ObjectFilter<'a>>,
pub first: Option<i32>,
pub last: Option<i32>,
Expand All @@ -54,7 +54,7 @@ pub struct Object {
pub bcs: Option<Base64>,
}

#[derive(cynic::InputObject, Debug)]
#[derive(cynic::InputObject, Debug, Clone)]
#[cynic(schema = "rpc", graphql_type = "ObjectFilter")]
pub struct ObjectFilter<'a> {
#[cynic(rename = "type")]
Expand All @@ -64,7 +64,7 @@ pub struct ObjectFilter<'a> {
pub object_keys: Option<Vec<ObjectKey>>,
}

#[derive(cynic::InputObject, Debug)]
#[derive(cynic::InputObject, Debug, Clone)]
#[cynic(schema = "rpc", graphql_type = "ObjectKey")]
pub struct ObjectKey {
pub object_id: Address,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-graphql-client/src/query_types/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub enum TransactionBlockKindInput {
ProgrammableTx,
}

#[derive(cynic::InputObject, Debug)]
#[derive(cynic::InputObject, Debug, Clone)]
#[cynic(schema = "rpc", graphql_type = "TransactionBlockFilter")]
pub struct TransactionsFilter {
pub function: Option<String>,
Expand Down

0 comments on commit 79f0ca6

Please sign in to comment.