Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more stream functions #24

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 166 additions & 8 deletions crates/sui-graphql-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
pub mod faucet;
pub mod query_types;

use async_stream::try_stream;
use base64ct::Encoding;
use query_types::ActiveValidatorsArgs;
use query_types::ActiveValidatorsQuery;
Expand Down Expand Up @@ -322,8 +323,8 @@ impl Client {
pub async fn coins(
&self,
owner: Address,
after: Option<&str>,
before: Option<&str>,
after: Option<String>,
before: Option<String>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why require an owned string now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I did like this because I need to pass the event filter in the streams calls, so I needed to clone it. Maybe I can just pass a reference.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bmwill this is what I came up with, what do you think?

first: Option<i32>,
last: Option<i32>,
coin_type: Option<&str>,
Expand Down Expand Up @@ -359,16 +360,17 @@ impl Client {
pub fn coins_stream<'a>(
&'a self,
owner: Address,
coin_type: Option<&'a str>,
coin_type: Option<String>,
) -> Pin<Box<dyn Stream<Item = Result<Coin, Error>> + 'a>> {
Box::pin(async_stream::try_stream! {
let coin_type = coin_type.unwrap_or_else(|| "0x2::coin::Coin".to_string());
Box::pin(try_stream! {
let mut after = None;
loop {
let response = self.objects(
after.as_deref(),
after,
None,
Some(ObjectFilter {
type_: Some(coin_type.unwrap_or("0x2::coin::Coin")),
type_: Some(&coin_type),
owner: Some(owner),
object_ids: None,
object_keys: None,
Expand Down Expand Up @@ -554,6 +556,35 @@ impl Client {
}
}

pub async fn events_stream<'a>(
&'a self,
after: Option<String>,
before: Option<String>,
filter: Option<EventFilter>,
first: Option<i32>,
last: Option<i32>,
) -> Pin<Box<dyn Stream<Item = Result<Event, Error>> + 'a>> {
Box::pin(try_stream! {
let mut after = after;
loop {
let response = self.events(filter.clone(), after, before.clone(), first, last).await?;
if let Some(page) = response {
for event in page.data {
yield event;
}

if let Some(end_cursor) = page.page_info.end_cursor {
after = Some(end_cursor);
} else {
break;
}
} else {
break;
}
}
})
}

// ===========================================================================
// Objects API
// ===========================================================================
Expand Down Expand Up @@ -615,8 +646,8 @@ impl Client {
/// ```
pub async fn objects(
&self,
after: Option<&str>,
before: Option<&str>,
after: Option<String>,
before: Option<String>,
filter: Option<ObjectFilter<'_>>,
first: Option<i32>,
last: Option<i32>,
Expand Down Expand Up @@ -659,6 +690,36 @@ impl Client {
}
}

/// Stream objects.
pub async fn objects_stream<'a>(
&'a self,
after: Option<String>,
before: Option<String>,
filter: Option<ObjectFilter<'a>>,
first: Option<i32>,
last: Option<i32>,
) -> Pin<Box<dyn Stream<Item = Result<Object, Error>> + 'a>> {
Box::pin(try_stream! {
let mut after = after;
loop {
let response = self.objects(after, before.clone(), filter.clone(), first, last).await?;
if let Some(page) = response {
for object in page.data {
yield object;
}

if let Some(end_cursor) = page.page_info.end_cursor {
after = Some(end_cursor);
} else {
break;
}
} else {
break;
}
}
})
}

/// Return the object's bcs content [`Vec<u8>`] based on the provided [`Address`].
pub async fn object_bcs(&self, object_id: Address) -> Result<Option<Vec<u8>>, Error> {
let operation = ObjectQuery::build(ObjectQueryArgs {
Expand Down Expand Up @@ -789,12 +850,45 @@ impl Client {
Ok(None)
}
}

/// Stream of transactions based on the provided filters.
pub async fn transactions_stream<'a>(
&'a self,
after: Option<String>,
before: Option<String>,
first: Option<i32>,
last: Option<i32>,
filter: Option<TransactionsFilter>,
) -> Pin<Box<dyn Stream<Item = Result<SignedTransaction, Error>> + 'a>> {
Box::pin(try_stream! {
let mut after = after;
loop {
let response = self.transactions(after, before.clone(), first, last, filter.clone()).await?;
if let Some(page) = response {
for tx in page.data {
yield tx;
}

if let Some(end_cursor) = page.page_info.end_cursor {
after = Some(end_cursor);
} else {
break;
}
} else {
break;
}
}
})
}
}

#[cfg(test)]
mod tests {
use futures::StreamExt;

use crate::query_types::EventFilter;
use crate::query_types::ObjectFilter;
use crate::query_types::TransactionsFilter;
use crate::Client;
use crate::DEVNET_HOST;
use crate::LOCAL_HOST;
Expand Down Expand Up @@ -1072,4 +1166,68 @@ mod tests {
);
}
}

// TODO remove ignore after PR #20 is merged
#[tokio::test]
#[ignore]
async fn test_events_stream() {
let client = Client::new_testnet();
let ef = EventFilter {
emitting_module: None,
event_type: None,
sender: None,
transaction_digest: None,
};

let mut stream = client
.events_stream(None, None, Some(ef), None, Some(10))
.await;

while let Some(result) = stream.next().await {
assert!(result.is_ok())
}
}

#[tokio::test]
async fn test_objects_stream() {
let client = Client::new_testnet();
let obj_filter = ObjectFilter {
type_: Some("0x2::sui::SUI"),
owner: None,
object_ids: None,
object_keys: None,
};

let mut stream = client
.objects_stream(None, None, Some(obj_filter), None, Some(10))
.await;

while let Some(result) = stream.next().await {
assert!(result.is_ok())
}
}

// TODO: remove the ignore after we fix tx bcs stuff
#[tokio::test]
#[ignore]
async fn test_transactions_stream() {
let client = Client::new_testnet();
let tx_filter = TransactionsFilter {
function: None,
kind: None,
at_checkpoint: None,
before_checkpoint: None,
changed_object: None,
input_object: None,
recv_address: None,
};

let mut stream = client
.transactions_stream(None, None, None, Some(10), Some(tx_filter))
.await;

while let Some(result) = stream.next().await {
assert!(result.is_ok())
}
}
}
2 changes: 1 addition & 1 deletion crates/sui-graphql-client/src/query_types/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct EventConnection {
pub nodes: Vec<Event>,
}

#[derive(cynic::InputObject, Debug)]
#[derive(cynic::InputObject, Debug, Clone)]
#[cynic(schema = "rpc", graphql_type = "EventFilter")]
pub struct EventFilter {
pub emitting_module: Option<String>,
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-graphql-client/src/query_types/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub struct ObjectQueryArgs {

#[derive(cynic::QueryVariables, Debug)]
pub struct ObjectsQueryArgs<'a> {
pub after: Option<&'a str>,
pub before: Option<&'a str>,
pub after: Option<String>,
pub before: Option<String>,
pub filter: Option<ObjectFilter<'a>>,
pub first: Option<i32>,
pub last: Option<i32>,
Expand All @@ -54,7 +54,7 @@ pub struct Object {
pub bcs: Option<Base64>,
}

#[derive(cynic::InputObject, Debug)]
#[derive(cynic::InputObject, Debug, Clone)]
#[cynic(schema = "rpc", graphql_type = "ObjectFilter")]
pub struct ObjectFilter<'a> {
#[cynic(rename = "type")]
Expand All @@ -64,7 +64,7 @@ pub struct ObjectFilter<'a> {
pub object_keys: Option<Vec<ObjectKey>>,
}

#[derive(cynic::InputObject, Debug)]
#[derive(cynic::InputObject, Debug, Clone)]
#[cynic(schema = "rpc", graphql_type = "ObjectKey")]
pub struct ObjectKey {
pub object_id: Address,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-graphql-client/src/query_types/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub enum TransactionBlockKindInput {
ProgrammableTx,
}

#[derive(cynic::InputObject, Debug)]
#[derive(cynic::InputObject, Debug, Clone)]
#[cynic(schema = "rpc", graphql_type = "TransactionBlockFilter")]
pub struct TransactionsFilter {
pub function: Option<String>,
Expand Down