Skip to content

Commit

Permalink
feat: implement a resource pool
Browse files Browse the repository at this point in the history
  • Loading branch information
jpraynaud committed May 22, 2024
1 parent ead9a29 commit 11a0237
Show file tree
Hide file tree
Showing 3 changed files with 295 additions and 11 deletions.
36 changes: 25 additions & 11 deletions mithril-aggregator/src/services/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ use rayon::prelude::*;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
sync::Arc,
time::Duration,
};
use tokio::sync::Mutex;

use mithril_common::{
crypto_helper::{MKMap, MKMapNode, MKTree},
entities::{
BlockRange, CardanoDbBeacon, CardanoTransaction, CardanoTransactionsSetProof,
TransactionHash,
},
resource_pool::ResourcePool,
signable_builder::BlockRangeRootRetriever,
StdResult,
};
Expand Down Expand Up @@ -62,7 +63,7 @@ pub trait TransactionsRetriever: Sync + Send {
pub struct MithrilProverService {
transaction_retriever: Arc<dyn TransactionsRetriever>,
block_range_root_retriever: Arc<dyn BlockRangeRootRetriever>,
mk_map_cache: Mutex<Option<MKMap<BlockRange, MKMapNode<BlockRange>>>>,
mk_map_cache: ResourcePool<MKMap<BlockRange, MKMapNode<BlockRange>>>,
}

impl MithrilProverService {
Expand All @@ -74,7 +75,7 @@ impl MithrilProverService {
Self {
transaction_retriever,
block_range_root_retriever,
mk_map_cache: Mutex::new(None),
mk_map_cache: ResourcePool::default(),
}
}

Expand Down Expand Up @@ -139,9 +140,11 @@ impl ProverService for MithrilProverService {
let mk_trees = BTreeMap::from_iter(mk_trees?);

// 3 - Compute block range roots Merkle map
// TODO: the cache computation should be done in the state machine only when new artifact is produced and at node startup
self.compute_cache(up_to).await?;
let mut mk_map = self.mk_map_cache.lock().await;
let mk_map = mk_map.as_mut().unwrap();
let mut mk_map = self
.mk_map_cache
.acquire_resource(Duration::from_millis(1000))?;

// 4 - Enrich the Merkle map with the block ranges Merkle trees
for (block_range, mk_tree) in mk_trees {
Expand All @@ -150,6 +153,9 @@ impl ProverService for MithrilProverService {

// 5 - Compute the proof for all transactions
if let Ok(mk_proof) = mk_map.compute_proof(transaction_hashes) {
self.mk_map_cache
.return_resource(mk_map.into_inner(), mk_map.discriminant());

let transaction_hashes_certified: Vec<TransactionHash> = transaction_hashes
.iter()
.filter(|hash| mk_proof.contains(&hash.as_str().into()).is_ok())
Expand All @@ -166,22 +172,30 @@ impl ProverService for MithrilProverService {
}

async fn compute_cache(&self, up_to: &CardanoDbBeacon) -> StdResult<()> {
let mut mk_map = self.mk_map_cache.lock().await;
if mk_map.is_none() {
println!("Computing Merkle map from block range roots");
if self.mk_map_cache.count() == 0 {
println!("Computing Merkle map cache from block range roots");

let mk_map_cache = self
.block_range_root_retriever
.compute_merkle_map_from_block_range_roots(up_to.immutable_file_number)
.await?;
mk_map.replace(mk_map_cache);
let discriminant_new = self.mk_map_cache.discriminant() + 1;
self.mk_map_cache.set_discriminant(discriminant_new);
for i in 0..10 {
println!("Computing Merkle map cache from block range roots: {}", i);
self.mk_map_cache
.return_resource(mk_map_cache.clone(), discriminant_new);
}
self.mk_map_cache
.return_resource(mk_map_cache, discriminant_new);
println!("Done computing Merkle map cache from block range roots");
}

Ok(())
}

async fn clear_cache(&self) -> StdResult<()> {
let mut mk_map = self.mk_map_cache.lock().await;
mk_map.take();
self.mk_map_cache.drain();

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions mithril-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub mod entities;
pub mod era;
pub mod messages;
pub mod protocol;
pub mod resource_pool;
pub mod signable_builder;

cfg_test_tools! {
Expand Down
269 changes: 269 additions & 0 deletions mithril-common/src/resource_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
//! Resource pool implementation
// TODO: better error implementation
use anyhow::anyhow;
use std::{
collections::VecDeque,
ops::{Deref, DerefMut},
sync::{Condvar, Mutex},
time::Duration,
};

use crate::StdResult;

/// Resource pool FIFO implementation
pub struct ResourcePool<T: Send + Sync> {
/// The size of the pool
size: usize,

/// Discriminant for the resource pool to check if a returned resource is stale
discriminant: Mutex<u64>,

/// Resources in the pool
resources: Mutex<VecDeque<T>>,

/// Condition variable to notify when a resource is available
not_empty: Condvar,
}

impl<T: Send + Sync> ResourcePool<T> {
/// Create a new resource pool
pub fn new(pool_size: usize, resources: Vec<T>) -> Self {
Self {
size: pool_size,
discriminant: Mutex::new(0),
resources: Mutex::new(resources.into()),
not_empty: Condvar::new(),
}
}

/// Acquire a resource from the pool with a timeout
pub fn acquire_resource(&self, timeout: Duration) -> StdResult<ResourcePoolItem<'_, T>> {
let mut resources = self.resources.lock().unwrap();
while resources.is_empty() {
let (resources_locked, timeout) =
self.not_empty.wait_timeout(resources, timeout).unwrap();
if timeout.timed_out() {
return Err(anyhow!("Acquire resource has timed out"));
}
resources = resources_locked;
}

Ok(ResourcePoolItem::new(self, resources.pop_front().unwrap()))
}

/// Return a resource to the pool
/// A resource is returned to the pool only if the discriminant matches
/// and if the pool is not already full
pub fn return_resource(&self, resource: T, discriminant: u64) {
if self.count() == self.size {
return;
}
let mut resources = self.resources.lock().unwrap();
if self.discriminant() != discriminant {
return;
}
resources.push_back(resource);
self.not_empty.notify_one();
}

/// Drain the pool
pub fn drain(&self) {
let mut resources = self.resources.lock().unwrap();
let _ = resources.drain(..).collect::<Vec<_>>();
}

/// Get the discriminant of the resource pool item
pub fn discriminant(&self) -> u64 {
*self.discriminant.lock().unwrap()
}

/// Set the discriminant of the resource pool item
pub fn set_discriminant(&self, discriminant: u64) {
*self.discriminant.lock().unwrap() = discriminant;
}

/// Count the resources in the pool
pub fn count(&self) -> usize {
self.resources.lock().unwrap().len()
}

/// Size of the resource pool
pub fn size(&self) -> usize {
self.size
}
}

impl<T: Send + Sync> Default for ResourcePool<T> {
fn default() -> Self {
Self::new(10, vec![])
}
}

/// Resource pool item which will return the resource to the pool when dropped
pub struct ResourcePoolItem<'a, T: Send + Sync> {
resource_pool: &'a ResourcePool<T>,
discriminant: u64,
resource: Option<T>,
}

impl<'a, T: Send + Sync> ResourcePoolItem<'a, T> {
/// Create a new resource pool item
pub fn new(resource_pool: &'a ResourcePool<T>, resource: T) -> Self {
let discriminant = *resource_pool.discriminant.lock().unwrap();
Self {
resource_pool,
discriminant,
resource: Some(resource),
}
}

/// Get the discriminant of the resource pool item
pub fn discriminant(&self) -> u64 {
self.discriminant
}

/// Get a reference to the inner resource
pub fn resource(&self) -> &T {
self.resource.as_ref().unwrap()
}

/// Take the inner resource
pub fn into_inner(&mut self) -> T {
self.resource.take().unwrap()
}
}

impl<T: Send + Sync> Deref for ResourcePoolItem<'_, T> {
type Target = T;

fn deref(&self) -> &T {
self.resource.as_ref().unwrap()
}
}

impl<T: Send + Sync> DerefMut for ResourcePoolItem<'_, T> {
fn deref_mut(&mut self) -> &mut T {
self.resource.as_mut().unwrap()
}
}

impl<T: Send + Sync> Drop for ResourcePoolItem<'_, T> {
fn drop(&mut self) {
if self.resource.is_some() {
let resource = self.into_inner();
self.resource_pool
.return_resource(resource, self.discriminant);
}
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;

#[test]
fn test_resource_pool_acquire_returns_resource_when_available() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());

let mut resources_items = vec![];
for _ in 0..pool_size {
let resource_item = pool.acquire_resource(Duration::from_millis(1000)).unwrap();
resources_items.push(resource_item);
}
let resources_result = resources_items
.iter_mut()
.map(|resource_item| resource_item.resource().to_owned())
.collect::<Vec<_>>();

assert_eq!(resources_expected, resources_result);
assert_eq!(pool.count(), 0);
}

#[tokio::test]
async fn test_resource_pool_acquire_locks_until_timeout_when_no_resource_available() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());

let mut resources_items = vec![];
for _ in 0..pool_size {
let resource_item = pool.acquire_resource(Duration::from_millis(1000)).unwrap();
resources_items.push(resource_item);
}

assert!(pool.acquire_resource(Duration::from_millis(1000)).is_err());
}

#[tokio::test]
async fn test_resource_pool_drains_successfully() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
assert_eq!(pool.count(), pool_size);

pool.drain();

assert_eq!(pool.count(), 0);
}

#[tokio::test]
async fn test_resource_pool_returns_fresh_resource() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
assert_eq!(pool.count(), pool_size);

let mut resource_item = pool.acquire_resource(Duration::from_millis(1000)).unwrap();
assert_eq!(pool.count(), pool_size - 1);
pool.return_resource(resource_item.into_inner(), pool.discriminant());

assert_eq!(pool.count(), pool_size);
}

#[tokio::test]
async fn test_resource_pool_returnsy_resource_automaticall() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
assert_eq!(pool.count(), pool_size);

{
let _resource_item = pool.acquire_resource(Duration::from_millis(1000)).unwrap();
assert_eq!(pool.count(), pool_size - 1);
}

assert_eq!(pool.count(), pool_size);
}

#[tokio::test]
async fn test_resource_pool_does_not_return_resource_when_pool_is_full() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
assert_eq!(pool.count(), pool_size);

pool.return_resource("resource".to_string(), pool.discriminant());

assert_eq!(pool.count(), pool_size);
}

#[tokio::test]
async fn test_resource_pool_does_not_return_stale_resource() {
let pool_size = 10;
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect();
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone());
assert_eq!(pool.count(), pool_size);

let mut resource_item = pool.acquire_resource(Duration::from_millis(1000)).unwrap();
assert_eq!(pool.count(), pool_size - 1);
let discriminant_stale = pool.discriminant();
pool.set_discriminant(pool.discriminant() + 1);
pool.return_resource(resource_item.into_inner(), discriminant_stale);

assert_eq!(pool.count(), pool_size - 1);
}
}

0 comments on commit 11a0237

Please sign in to comment.