From e646377afee2b061e82670a8c1f37ceeb04f6cc3 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Raynaud Date: Thu, 16 May 2024 11:43:46 +0200 Subject: [PATCH] wip: implement a resource pool --- mithril-aggregator/src/services/prover.rs | 29 +++++++----- mithril-common/src/lib.rs | 1 + mithril-common/src/resource_pool.rs | 55 +++++++++++++++++++++++ 3 files changed, 74 insertions(+), 11 deletions(-) create mode 100644 mithril-common/src/resource_pool.rs diff --git a/mithril-aggregator/src/services/prover.rs b/mithril-aggregator/src/services/prover.rs index c6e011eca2c..b6f9acdbab6 100644 --- a/mithril-aggregator/src/services/prover.rs +++ b/mithril-aggregator/src/services/prover.rs @@ -4,7 +4,6 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, sync::Arc, }; -use tokio::sync::Mutex; use mithril_common::{ crypto_helper::{MKMap, MKMapNode, MKTree}, @@ -12,6 +11,7 @@ use mithril_common::{ BlockRange, CardanoDbBeacon, CardanoTransaction, CardanoTransactionsSetProof, TransactionHash, }, + resource_pool::ResourcePool, signable_builder::BlockRangeRootRetriever, StdResult, }; @@ -62,7 +62,7 @@ pub trait TransactionsRetriever: Sync + Send { pub struct MithrilProverService { transaction_retriever: Arc, block_range_root_retriever: Arc, - mk_map_cache: Mutex>>>, + mk_map_cache: ResourcePool>>, } impl MithrilProverService { @@ -74,7 +74,7 @@ impl MithrilProverService { Self { transaction_retriever, block_range_root_retriever, - mk_map_cache: Mutex::new(None), + mk_map_cache: ResourcePool::new(vec![]), } } @@ -139,9 +139,9 @@ impl ProverService for MithrilProverService { let mk_trees = BTreeMap::from_iter(mk_trees?); // 3 - Compute block range roots Merkle map + // TODO: the cache computation should be done in the state machine only when new artifact is produced and at node startup self.compute_cache(up_to).await?; - let mut mk_map = self.mk_map_cache.lock().await; - let mk_map = mk_map.as_mut().unwrap(); + let mut mk_map = self.mk_map_cache.acquire_resource().await; // 4 - Enrich the Merkle map with the block ranges Merkle trees for (block_range, mk_tree) in mk_trees { @@ -150,6 +150,8 @@ impl ProverService for MithrilProverService { // 5 - Compute the proof for all transactions if let Ok(mk_proof) = mk_map.compute_proof(transaction_hashes) { + self.mk_map_cache.return_resource(mk_map).await; + let transaction_hashes_certified: Vec = transaction_hashes .iter() .filter(|hash| mk_proof.contains(&hash.as_str().into()).is_ok()) @@ -166,22 +168,27 @@ impl ProverService for MithrilProverService { } async fn compute_cache(&self, up_to: &CardanoDbBeacon) -> StdResult<()> { - let mut mk_map = self.mk_map_cache.lock().await; - if mk_map.is_none() { - println!("Computing Merkle map from block range roots"); + if self.mk_map_cache.count().await == 0 { + println!("Computing Merkle map cache from block range roots"); let mk_map_cache = self .block_range_root_retriever .compute_merkle_map_from_block_range_roots(up_to.immutable_file_number) .await?; - mk_map.replace(mk_map_cache); + for i in 0..100 { + println!("Computing Merkle map cache from block range roots: {}", i); + self.mk_map_cache + .return_resource(mk_map_cache.clone()) + .await; + } + self.mk_map_cache.return_resource(mk_map_cache).await; + println!("Done computing Merkle map cache from block range roots"); } Ok(()) } async fn clear_cache(&self) -> StdResult<()> { - let mut mk_map = self.mk_map_cache.lock().await; - mk_map.take(); + self.mk_map_cache.drain().await; Ok(()) } diff --git a/mithril-common/src/lib.rs b/mithril-common/src/lib.rs index 6fb6924cfc4..ea8adf2c2f2 100644 --- a/mithril-common/src/lib.rs +++ b/mithril-common/src/lib.rs @@ -59,6 +59,7 @@ pub mod entities; pub mod era; pub mod messages; pub mod protocol; +pub mod resource_pool; pub mod signable_builder; cfg_test_tools! { diff --git a/mithril-common/src/resource_pool.rs b/mithril-common/src/resource_pool.rs new file mode 100644 index 00000000000..4a4f4d22f5a --- /dev/null +++ b/mithril-common/src/resource_pool.rs @@ -0,0 +1,55 @@ +//! Resource pool implementation +// TODO: automatic return resource to the pool with Guard and Drop +// TODO: avoid returning stale resources +// TODO: add tests +use std::{ + collections::VecDeque, + sync::{Condvar, Mutex}, +}; + +/// Resource pool +pub struct ResourcePool { + resources: Mutex>, + not_empty: Condvar, +} + +impl ResourcePool { + /// Create a new resource pool + pub fn new(resources: Vec) -> Self { + Self { + resources: Mutex::new(resources.into()), + not_empty: Condvar::new(), + } + } + + /// Acquire a resource from the pool + pub async fn acquire_resource(&self) -> T { + let mut resources = self.resources.lock().unwrap(); + while resources.is_empty() { + resources = self.not_empty.wait(resources).unwrap(); + } + resources.pop_front().unwrap() + } + + /// Return a resource to the pool + // TODO: automatic return resource to the pool with Guard and Drop + pub async fn return_resource(&self, resource: T) { + let mut resources = self.resources.lock().unwrap(); + resources.push_back(resource); + self.not_empty.notify_one(); + } + + /// Drain the pool + pub async fn drain(&self) { + let mut resources = self.resources.lock().unwrap(); + let _ = resources.drain(..).collect::>(); + } + + /// Count the resources in the pool + pub async fn count(&self) -> usize { + self.resources.lock().unwrap().len() + } +} + +#[cfg(test)] +mod tests {}