Skip to content

Commit

Permalink
wip: implement a resource pool
Browse files Browse the repository at this point in the history
  • Loading branch information
jpraynaud committed May 16, 2024
1 parent d525625 commit e646377
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 11 deletions.
29 changes: 18 additions & 11 deletions mithril-aggregator/src/services/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use std::{
collections::{BTreeMap, BTreeSet, HashMap},
sync::Arc,
};
use tokio::sync::Mutex;

use mithril_common::{
crypto_helper::{MKMap, MKMapNode, MKTree},
entities::{
BlockRange, CardanoDbBeacon, CardanoTransaction, CardanoTransactionsSetProof,
TransactionHash,
},
resource_pool::ResourcePool,
signable_builder::BlockRangeRootRetriever,
StdResult,
};
Expand Down Expand Up @@ -62,7 +62,7 @@ pub trait TransactionsRetriever: Sync + Send {
pub struct MithrilProverService {
transaction_retriever: Arc<dyn TransactionsRetriever>,
block_range_root_retriever: Arc<dyn BlockRangeRootRetriever>,
mk_map_cache: Mutex<Option<MKMap<BlockRange, MKMapNode<BlockRange>>>>,
mk_map_cache: ResourcePool<MKMap<BlockRange, MKMapNode<BlockRange>>>,
}

impl MithrilProverService {
Expand All @@ -74,7 +74,7 @@ impl MithrilProverService {
Self {
transaction_retriever,
block_range_root_retriever,
mk_map_cache: Mutex::new(None),
mk_map_cache: ResourcePool::new(vec![]),
}
}

Expand Down Expand Up @@ -139,9 +139,9 @@ impl ProverService for MithrilProverService {
let mk_trees = BTreeMap::from_iter(mk_trees?);

// 3 - Compute block range roots Merkle map
// TODO: the cache computation should be done in the state machine only when new artifact is produced and at node startup
self.compute_cache(up_to).await?;
let mut mk_map = self.mk_map_cache.lock().await;
let mk_map = mk_map.as_mut().unwrap();
let mut mk_map = self.mk_map_cache.acquire_resource().await;

// 4 - Enrich the Merkle map with the block ranges Merkle trees
for (block_range, mk_tree) in mk_trees {
Expand All @@ -150,6 +150,8 @@ impl ProverService for MithrilProverService {

// 5 - Compute the proof for all transactions
if let Ok(mk_proof) = mk_map.compute_proof(transaction_hashes) {
self.mk_map_cache.return_resource(mk_map).await;

let transaction_hashes_certified: Vec<TransactionHash> = transaction_hashes
.iter()
.filter(|hash| mk_proof.contains(&hash.as_str().into()).is_ok())
Expand All @@ -166,22 +168,27 @@ impl ProverService for MithrilProverService {
}

async fn compute_cache(&self, up_to: &CardanoDbBeacon) -> StdResult<()> {
let mut mk_map = self.mk_map_cache.lock().await;
if mk_map.is_none() {
println!("Computing Merkle map from block range roots");
if self.mk_map_cache.count().await == 0 {
println!("Computing Merkle map cache from block range roots");
let mk_map_cache = self
.block_range_root_retriever
.compute_merkle_map_from_block_range_roots(up_to.immutable_file_number)
.await?;
mk_map.replace(mk_map_cache);
for i in 0..100 {
println!("Computing Merkle map cache from block range roots: {}", i);
self.mk_map_cache
.return_resource(mk_map_cache.clone())
.await;
}
self.mk_map_cache.return_resource(mk_map_cache).await;
println!("Done computing Merkle map cache from block range roots");
}

Ok(())
}

async fn clear_cache(&self) -> StdResult<()> {
let mut mk_map = self.mk_map_cache.lock().await;
mk_map.take();
self.mk_map_cache.drain().await;

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions mithril-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub mod entities;
pub mod era;
pub mod messages;
pub mod protocol;
pub mod resource_pool;
pub mod signable_builder;

cfg_test_tools! {
Expand Down
55 changes: 55 additions & 0 deletions mithril-common/src/resource_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//! Resource pool implementation
// TODO: automatic return resource to the pool with Guard and Drop
// TODO: avoid returning stale resources
// TODO: add tests
use std::{
collections::VecDeque,
sync::{Condvar, Mutex},
};

/// Resource pool
pub struct ResourcePool<T: Send + Sync> {
resources: Mutex<VecDeque<T>>,
not_empty: Condvar,
}

impl<T: Send + Sync> ResourcePool<T> {
/// Create a new resource pool
pub fn new(resources: Vec<T>) -> Self {
Self {
resources: Mutex::new(resources.into()),
not_empty: Condvar::new(),
}
}

/// Acquire a resource from the pool
pub async fn acquire_resource(&self) -> T {
let mut resources = self.resources.lock().unwrap();
while resources.is_empty() {
resources = self.not_empty.wait(resources).unwrap();
}
resources.pop_front().unwrap()
}

/// Return a resource to the pool
// TODO: automatic return resource to the pool with Guard and Drop
pub async fn return_resource(&self, resource: T) {
let mut resources = self.resources.lock().unwrap();
resources.push_back(resource);
self.not_empty.notify_one();
}

/// Drain the pool
pub async fn drain(&self) {
let mut resources = self.resources.lock().unwrap();
let _ = resources.drain(..).collect::<Vec<_>>();
}

/// Count the resources in the pool
pub async fn count(&self) -> usize {
self.resources.lock().unwrap().len()
}
}

#[cfg(test)]
mod tests {}

0 comments on commit e646377

Please sign in to comment.