Skip to content

Commit

Permalink
feat: Use custom S3Client if configured
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Apr 15, 2024
1 parent 6a32564 commit acc3610
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ pub use near_indexer_primitives;
pub use aws_credential_types::Credentials;
pub use types::{LakeConfig, LakeConfigBuilder};

pub mod s3_client;
pub mod s3_fetchers;
pub(crate) mod types;

Expand Down Expand Up @@ -288,7 +289,7 @@ pub fn streamer(
}

fn stream_block_heights<'a: 'b, 'b>(
lake_s3_client: &'a s3_fetchers::LakeS3Client,
lake_s3_client: &'a dyn s3_client::S3Client,
s3_bucket_name: &'a str,
mut start_from_block_height: crate::types::BlockHeight,
) -> impl futures::Stream<Item = u64> + 'b {
Expand Down Expand Up @@ -384,16 +385,19 @@ async fn start(
) -> anyhow::Result<()> {
let mut start_from_block_height = config.start_block_height;

let s3_client = if let Some(config) = config.s3_config {
aws_sdk_s3::Client::from_conf(config)
} else {
let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
.region(aws_types::region::Region::new(config.s3_region_name))
.build();
aws_sdk_s3::Client::from_conf(s3_config)
};
let lake_s3_client = s3_fetchers::LakeS3Client::new(s3_client.clone());
let lake_s3_client: Box<dyn crate::s3_client::S3Client> =
if let Some(s3_client) = config.s3_client {
s3_client
} else if let Some(config) = config.s3_config {
Box::new(s3_fetchers::LakeS3Client::from_conf(config))
} else {
let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
.region(aws_types::region::Region::new(config.s3_region_name))
.build();

Box::new(s3_fetchers::LakeS3Client::from_conf(s3_config))
};

let mut last_processed_block_hash: Option<near_indexer_primitives::CryptoHash> = None;

Expand All @@ -406,7 +410,7 @@ async fn start(
// We require to stream blocks consistently, so we need to try to load the block again.

let pending_block_heights = stream_block_heights(
&lake_s3_client,
&*lake_s3_client,
&config.s3_bucket_name,
start_from_block_height,
);
Expand All @@ -429,7 +433,7 @@ async fn start(
.into_iter()
.map(|block_height| {
s3_fetchers::fetch_streamer_message(
&lake_s3_client,
&*lake_s3_client,
&config.s3_bucket_name,
block_height,
)
Expand Down Expand Up @@ -530,7 +534,7 @@ async fn start(
.into_iter()
.map(|block_height| {
s3_fetchers::fetch_streamer_message(
&lake_s3_client,
&*lake_s3_client,
&config.s3_bucket_name,
block_height,
)
Expand Down

0 comments on commit acc3610

Please sign in to comment.