Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added fastnear as an alternative provider of NEAR blockchain data #109

Draft
wants to merge 6 commits into
base: 0.7.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ categories = ["asynchronous", "api-bindings", "network-programming"]
keywords = ["near", "near-lake", "near-indexer"]
authors = ["Near Inc <hello@nearprotocol.com>"]
edition = "2021"
rust-version = "1.75.0"
rust-version = "1.79.0"

# cargo-workspaces
[workspace.metadata.workspaces]
Expand All @@ -25,14 +25,18 @@ async-stream = "0.3.5"
async-trait = "0.1.77"
derive_builder = "0.13.0"
futures = "0.3.30"
reqwest = { version = "0.12.7", features = ["json"] }
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
thiserror = "1.0.56"
tokio = { version = "1.35.1", features = ["sync", "time", "rt", "macros"] }
tokio-stream = { version = "0.1.14" }
tracing = "0.1.40"

near-indexer-primitives = "0.23.0"
# Bug with deserialization of the transactions it should be fixed in the next release 0.27.0
# near-indexer-primitives = "0.26.0"
# for now we use the forked version
near-indexer-primitives = { git = 'https://github.com/kobayurii/nearcore.git', branch = "2.2.1-fork1" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: this is the blocker from merging this change into the 0.7.x branch and releasing a new version


[lib]
doctest = false
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async fn main() -> Result<(), tokio::io::Error> {
.expect("Failed to build LakeConfig");

// instantiate the NEAR Lake Framework Stream
let (sender, stream) = near_lake_framework::streamer(config);
let (sender, stream) = near_lake_framework::streamer(config.into());

// read the stream events and pass them to a handler function with
// concurrency 1
Expand Down
292 changes: 15 additions & 277 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! .expect("Failed to build LakeConfig");
//!
//! // instantiate the NEAR Lake Framework Stream
//! let (sender, stream) = near_lake_framework::streamer(config);
//! let (sender, stream) = near_lake_framework::streamer(config.into());
//!
//! // read the stream events and pass them to a handler function with
//! // concurrency 1
Expand Down Expand Up @@ -244,18 +244,13 @@
#[macro_use]
extern crate derive_builder;

use futures::stream::StreamExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::SendError;

pub use near_indexer_primitives;

pub use aws_credential_types::Credentials;
pub use types::{LakeConfig, LakeConfigBuilder};
pub use providers::fastnear::types::{FastNearConfig, FastNearConfigBuilder};
pub use providers::s3::types::{LakeConfig, LakeConfigBuilder};

pub mod s3_client;
pub mod s3_fetchers;
pub(crate) mod types;
pub mod providers;

pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework";

Expand All @@ -271,284 +266,27 @@ pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework";
/// .build()
/// .expect("Failed to build LakeConfig");
///
/// let (_, stream) = near_lake_framework::streamer(config);
/// let (_, stream) = near_lake_framework::streamer(config.into());
///
/// while let Some(streamer_message) = stream.recv().await {
/// eprintln!("{:#?}", streamer_message);
/// }
/// # }
/// ```
pub fn streamer(
config: LakeConfig,
config: providers::NearLakeFrameworkConfig,
) -> (
tokio::task::JoinHandle<Result<(), anyhow::Error>>,
mpsc::Receiver<near_indexer_primitives::StreamerMessage>,
tokio::sync::mpsc::Receiver<near_indexer_primitives::StreamerMessage>,
) {
let (sender, receiver) = mpsc::channel(config.blocks_preload_pool_size);
(tokio::spawn(start(sender, config)), receiver)
}

fn stream_block_heights<'a: 'b, 'b>(
lake_s3_client: &'a dyn s3_client::S3Client,
s3_bucket_name: &'a str,
mut start_from_block_height: crate::types::BlockHeight,
) -> impl futures::Stream<Item = u64> + 'b {
async_stream::stream! {
loop {
tracing::debug!(target: LAKE_FRAMEWORK, "Fetching a list of blocks from S3...");
match s3_fetchers::list_block_heights(
lake_s3_client,
s3_bucket_name,
start_from_block_height,
)
.await {
Ok(block_heights) => {
if block_heights.is_empty() {
tracing::debug!(
target: LAKE_FRAMEWORK,
"There are no newer block heights than {} in bucket {}. Fetching again in 2s...",
start_from_block_height,
s3_bucket_name,
);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
continue;
}
tracing::debug!(
target: LAKE_FRAMEWORK,
"Received {} newer block heights",
block_heights.len()
);

start_from_block_height = *block_heights.last().unwrap() + 1;
for block_height in block_heights {
tracing::debug!(target: LAKE_FRAMEWORK, "Yielding {} block height...", block_height);
yield block_height;
}
}
Err(err) => {
tracing::warn!(
target: LAKE_FRAMEWORK,
"Failed to get block heights from bucket {}: {}. Retrying in 1s...",
s3_bucket_name,
err,
);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}
}
}

// The only consumer of the BlockHeights Streamer
async fn prefetch_block_heights_into_pool(
pending_block_heights: &mut std::pin::Pin<
&mut impl tokio_stream::Stream<Item = crate::types::BlockHeight>,
>,
limit: usize,
await_for_at_least_one: bool,
) -> anyhow::Result<Vec<crate::types::BlockHeight>> {
let mut block_heights = Vec::with_capacity(limit);
for remaining_limit in (0..limit).rev() {
tracing::debug!(target: LAKE_FRAMEWORK, "Polling for the next block height without awaiting... (up to {} block heights are going to be fetched)", remaining_limit);
match futures::poll!(pending_block_heights.next()) {
std::task::Poll::Ready(Some(block_height)) => {
block_heights.push(block_height);
}
std::task::Poll::Pending => {
if await_for_at_least_one && block_heights.is_empty() {
tracing::debug!(target: LAKE_FRAMEWORK, "There were no block heights available immediatelly, and the prefetching blocks queue is empty, so we need to await for at least a single block height to be available before proceeding...");
match pending_block_heights.next().await {
Some(block_height) => {
block_heights.push(block_height);
}
None => {
return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite."));
}
}
continue;
}
tracing::debug!(target: LAKE_FRAMEWORK, "There were no block heights available immediatelly, so we should not block here and keep processing the blocks.");
break;
}
std::task::Poll::Ready(None) => {
return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite."));
}
let (sender, receiver) = tokio::sync::mpsc::channel(config.blocks_preload_pool_size());
match config {
providers::NearLakeFrameworkConfig::Lake(config) => {
(tokio::spawn(providers::s3::start(sender, config)), receiver)
}
}
Ok(block_heights)
}

#[allow(unused_labels)] // we use loop labels for code-readability
async fn start(
streamer_message_sink: mpsc::Sender<near_indexer_primitives::StreamerMessage>,
config: LakeConfig,
) -> anyhow::Result<()> {
let mut start_from_block_height = config.start_block_height;

let lake_s3_client: Box<dyn crate::s3_client::S3Client> =
if let Some(s3_client) = config.s3_client {
s3_client
} else if let Some(config) = config.s3_config {
Box::new(s3_fetchers::LakeS3Client::from_conf(config))
} else {
let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
.region(aws_types::region::Region::new(config.s3_region_name))
.build();

Box::new(s3_fetchers::LakeS3Client::from_conf(s3_config))
};

let mut last_processed_block_hash: Option<near_indexer_primitives::CryptoHash> = None;

'main: loop {
// In the beginning of the 'main' loop we create a Block Heights stream
// and prefetch the initial data in that pool.
// Later the 'stream' loop might exit to this 'main' one to repeat the procedure.
// This happens because we assume Lake Indexer that writes to the S3 Bucket might
// in some cases, write N+1 block before it finishes writing the N block.
// We require to stream blocks consistently, so we need to try to load the block again.

let pending_block_heights = stream_block_heights(
&*lake_s3_client,
&config.s3_bucket_name,
start_from_block_height,
);
tokio::pin!(pending_block_heights);

let mut streamer_messages_futures = futures::stream::FuturesOrdered::new();
tracing::debug!(
target: LAKE_FRAMEWORK,
"Prefetching up to {} blocks...",
config.blocks_preload_pool_size
);

streamer_messages_futures.extend(
prefetch_block_heights_into_pool(
&mut pending_block_heights,
config.blocks_preload_pool_size,
true,
)
.await?
.into_iter()
.map(|block_height| {
s3_fetchers::fetch_streamer_message(
&*lake_s3_client,
&config.s3_bucket_name,
block_height,
)
}),
);

tracing::debug!(
target: LAKE_FRAMEWORK,
"Awaiting for the first prefetched block..."
);
'stream: while let Some(streamer_message_result) = streamer_messages_futures.next().await {
let streamer_message = streamer_message_result.map_err(|err| {
tracing::error!(
target: LAKE_FRAMEWORK,
"Failed to fetch StreamerMessage with error: \n{:#?}",
err,
);
err
})?;

tracing::debug!(
target: LAKE_FRAMEWORK,
"Received block #{} ({})",
streamer_message.block.header.height,
streamer_message.block.header.hash
);
// check if we have `last_processed_block_hash` (might be None only on start)
if let Some(prev_block_hash) = last_processed_block_hash {
// compare last_processed_block_hash` with `block.header.prev_hash` of the current
// block (ensure we don't miss anything from S3)
// retrieve the data from S3 if prev_hashes don't match and repeat the main loop step
if prev_block_hash != streamer_message.block.header.prev_hash {
tracing::warn!(
target: LAKE_FRAMEWORK,
"`prev_hash` does not match, refetching the data from S3 in 200ms",
);
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
break 'stream;
}
}

// store current block info as `last_processed_block_*` for next iteration
last_processed_block_hash = Some(streamer_message.block.header.hash);
start_from_block_height = streamer_message.block.header.height + 1;

tracing::debug!(
target: LAKE_FRAMEWORK,
"Prefetching up to {} blocks... (there are {} blocks in the prefetching pool)",
config.blocks_preload_pool_size,
streamer_messages_futures.len(),
);
tracing::debug!(
target: LAKE_FRAMEWORK,
"Streaming block #{} ({})",
streamer_message.block.header.height,
streamer_message.block.header.hash
);
let blocks_preload_pool_current_len = streamer_messages_futures.len();

let prefetched_block_heights_future = prefetch_block_heights_into_pool(
&mut pending_block_heights,
config
.blocks_preload_pool_size
.saturating_sub(blocks_preload_pool_current_len),
blocks_preload_pool_current_len == 0,
);

let streamer_message_sink_send_future = streamer_message_sink.send(streamer_message);

let (prefetch_res, send_res): (
Result<Vec<types::BlockHeight>, anyhow::Error>,
Result<_, SendError<near_indexer_primitives::StreamerMessage>>,
) = futures::join!(
prefetched_block_heights_future,
streamer_message_sink_send_future,
);

if let Err(SendError(err)) = send_res {
tracing::debug!(
target: LAKE_FRAMEWORK,
"Failed to send StreamerMessage (#{:0>12}) to the channel. Channel is closed, exiting \n{:?}",
start_from_block_height - 1,
err,
);
return Ok(());
}

streamer_messages_futures.extend(
prefetch_res
.map_err(|err| {
tracing::error!(
target: LAKE_FRAMEWORK,
"Failed to prefetch block heights to the prefetching pool with error: \n{:#?}",
err
);
err
})?
.into_iter()
.map(|block_height| {
s3_fetchers::fetch_streamer_message(
&*lake_s3_client,
&config.s3_bucket_name,
block_height,
)
}
));
}

tracing::warn!(
target: LAKE_FRAMEWORK,
"Exited from the 'stream' loop. It may happen in two cases:\n
1. Blocks has ended (impossible, might be an error on the Lake Buckets),\n
2. Received a Block which prev_hash doesn't match the previously streamed block.\n
Will attempt to restart the stream from block #{}",
start_from_block_height,
);
providers::NearLakeFrameworkConfig::FastNear(config) => (
tokio::spawn(providers::fastnear::start(sender, config)),
receiver,
),
}
}
Loading
Loading