-
Notifications
You must be signed in to change notification settings - Fork 41
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
310 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,287 @@ | ||
//! Resource pool implementation | ||
// TODO: fix tests | ||
use anyhow::anyhow; | ||
use std::{ | ||
collections::VecDeque, | ||
ops::{Deref, DerefMut}, | ||
sync::{Condvar, Mutex}, | ||
time::Duration, | ||
}; | ||
|
||
use crate::StdResult; | ||
|
||
/// Resource pool FIFO implementation | ||
pub struct ResourcePool<T: Send + Sync> { | ||
/// The size of the pool | ||
size: usize, | ||
|
||
/// Discriminant for the resource pool to check if a returned resource is stale | ||
discriminant: Mutex<u64>, | ||
|
||
/// Resources in the pool | ||
resources: Mutex<VecDeque<T>>, | ||
|
||
/// Condition variable to notify when a resource is available | ||
not_empty: Condvar, | ||
} | ||
|
||
impl<T: Send + Sync> ResourcePool<T> { | ||
/// Create a new resource pool | ||
pub fn new(pool_size: usize, resources: Vec<T>) -> Self { | ||
Self { | ||
size: pool_size, | ||
discriminant: Mutex::new(0), | ||
resources: Mutex::new(resources.into()), | ||
not_empty: Condvar::new(), | ||
} | ||
} | ||
|
||
/// Acquire a resource from the pool | ||
pub fn acquire_resource(&self) -> ResourcePoolItem<'_, T> { | ||
self.acquire_resource_with_timeout(Duration::MAX).unwrap() | ||
} | ||
|
||
/// Acquire a resource from the pool with a timeout | ||
pub fn acquire_resource_with_timeout( | ||
&self, | ||
duration: Duration, | ||
) -> StdResult<ResourcePoolItem<'_, T>> { | ||
let mut resources = self.resources.lock().unwrap(); | ||
while resources.is_empty() { | ||
let (resources_locked, timeout) = | ||
self.not_empty.wait_timeout(resources, duration).unwrap(); | ||
if timeout.timed_out() { | ||
return Err(anyhow!("Acquire resource has timed out")); | ||
} | ||
resources = resources_locked; | ||
} | ||
|
||
Ok(ResourcePoolItem::new(self, resources.pop_front().unwrap())) | ||
} | ||
|
||
/// Return a resource to the pool | ||
/// A resource is returned to the pool only if the discriminant matches | ||
/// and if the pool is not already full | ||
pub fn return_resource(&self, resource: T, discriminant: u64) { | ||
if self.count() == self.size { | ||
return; | ||
} | ||
let mut resources = self.resources.lock().unwrap(); | ||
if self.discriminant() != discriminant { | ||
return; | ||
} | ||
resources.push_back(resource); | ||
self.not_empty.notify_one(); | ||
} | ||
|
||
/// Drain the pool | ||
pub fn drain(&self) { | ||
let mut resources = self.resources.lock().unwrap(); | ||
let _ = resources.drain(..).collect::<Vec<_>>(); | ||
} | ||
|
||
/// Get the discriminant of the resource pool item | ||
pub fn discriminant(&self) -> u64 { | ||
*self.discriminant.lock().unwrap() | ||
} | ||
|
||
/// Set the discriminant of the resource pool item | ||
pub fn set_discriminant(&self, discriminant: u64) { | ||
*self.discriminant.lock().unwrap() = discriminant; | ||
} | ||
|
||
/// Count the resources in the pool | ||
pub fn count(&self) -> usize { | ||
self.resources.lock().unwrap().len() | ||
} | ||
|
||
/// Size of the resource pool | ||
pub fn size(&self) -> usize { | ||
self.size | ||
} | ||
} | ||
|
||
impl<T: Send + Sync> Default for ResourcePool<T> { | ||
fn default() -> Self { | ||
Self::new(10, vec![]) | ||
} | ||
} | ||
|
||
/// Resource pool item which will return the resource to the pool when dropped | ||
pub struct ResourcePoolItem<'a, T: Send + Sync> { | ||
resource_pool: &'a ResourcePool<T>, | ||
discriminant: u64, | ||
resource: Option<T>, | ||
} | ||
|
||
impl<'a, T: Send + Sync> ResourcePoolItem<'a, T> { | ||
/// Create a new resource pool item | ||
pub fn new(resource_pool: &'a ResourcePool<T>, resource: T) -> Self { | ||
let discriminant = *resource_pool.discriminant.lock().unwrap(); | ||
Self { | ||
resource_pool, | ||
discriminant, | ||
resource: Some(resource), | ||
} | ||
} | ||
|
||
/// Get the discriminant of the resource pool item | ||
pub fn discriminant(&self) -> u64 { | ||
self.discriminant | ||
} | ||
|
||
/// Get a reference to the inner resource | ||
pub fn resource(&self) -> &T { | ||
self.resource.as_ref().unwrap() | ||
} | ||
|
||
/// Take the inner resource | ||
pub fn into_inner(&mut self) -> T { | ||
self.resource.take().unwrap() | ||
} | ||
} | ||
|
||
impl<T: Send + Sync> Deref for ResourcePoolItem<'_, T> { | ||
type Target = T; | ||
|
||
fn deref(&self) -> &T { | ||
self.resource.as_ref().unwrap() | ||
} | ||
} | ||
|
||
impl<T: Send + Sync> DerefMut for ResourcePoolItem<'_, T> { | ||
fn deref_mut(&mut self) -> &mut T { | ||
self.resource.as_mut().unwrap() | ||
} | ||
} | ||
|
||
impl<T: Send + Sync> Drop for ResourcePoolItem<'_, T> { | ||
fn drop(&mut self) { | ||
let resource = self.into_inner(); | ||
self.resource_pool | ||
.return_resource(resource, self.discriminant); | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::time::Duration; | ||
|
||
use super::*; | ||
|
||
#[test] | ||
fn test_resource_pool_acquire_returns_resource_when_available() { | ||
let pool_size = 10; | ||
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect(); | ||
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone()); | ||
|
||
let mut resources_items = vec![]; | ||
for _ in 0..pool_size { | ||
let resource_item = pool | ||
.acquire_resource_with_timeout(Duration::from_millis(1000)) | ||
.unwrap(); | ||
resources_items.push(resource_item); | ||
} | ||
let resources_result = resources_items | ||
.iter_mut() | ||
.map(|resource_item| resource_item.resource().to_owned()) | ||
.collect::<Vec<_>>(); | ||
|
||
assert_eq!(resources_expected, resources_result); | ||
assert_eq!(pool.count(), 0); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_resource_pool_acquire_locks_when_no_resource_available() { | ||
let pool_size = 10; | ||
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect(); | ||
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone()); | ||
|
||
let mut resources_items = vec![]; | ||
for _ in 0..pool_size { | ||
let resource_item = pool | ||
.acquire_resource_with_timeout(Duration::from_millis(1000)) | ||
.unwrap(); | ||
resources_items.push(resource_item); | ||
} | ||
|
||
assert!(pool | ||
.acquire_resource_with_timeout(Duration::from_millis(1000)) | ||
.is_err()); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_resource_pool_drains_successfully() { | ||
let pool_size = 10; | ||
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect(); | ||
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone()); | ||
assert_eq!(pool.count(), pool_size); | ||
|
||
pool.drain(); | ||
|
||
assert_eq!(pool.count(), 0); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_resource_pool_returns_fresh_resource() { | ||
let pool_size = 10; | ||
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect(); | ||
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone()); | ||
assert_eq!(pool.count(), pool_size); | ||
|
||
let mut resource_item = pool | ||
.acquire_resource_with_timeout(Duration::from_millis(1000)) | ||
.unwrap(); | ||
assert_eq!(pool.count(), pool_size - 1); | ||
pool.return_resource(resource_item.into_inner(), pool.discriminant()); | ||
|
||
assert_eq!(pool.count(), pool_size); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_resource_pool_returns_automatically_resource() { | ||
let pool_size = 10; | ||
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect(); | ||
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone()); | ||
assert_eq!(pool.count(), pool_size); | ||
|
||
{ | ||
let _resource_item = pool | ||
.acquire_resource_with_timeout(Duration::from_millis(1000)) | ||
.unwrap(); | ||
assert_eq!(pool.count(), pool_size - 1); | ||
} | ||
|
||
assert_eq!(pool.count(), pool_size); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_resource_pool_does_not_return_resource_when_pool_is_full() { | ||
let pool_size = 10; | ||
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect(); | ||
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone()); | ||
assert_eq!(pool.count(), pool_size); | ||
|
||
pool.return_resource("resource".to_string(), pool.discriminant()); | ||
|
||
assert_eq!(pool.count(), pool_size); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_resource_pool_does_not_return_stale_resource() { | ||
let pool_size = 10; | ||
let resources_expected: Vec<String> = (0..pool_size).map(|i| i.to_string()).collect(); | ||
let pool = ResourcePool::<String>::new(pool_size, resources_expected.clone()); | ||
assert_eq!(pool.count(), pool_size); | ||
|
||
let mut resource_item = pool | ||
.acquire_resource_with_timeout(Duration::from_millis(1000)) | ||
.unwrap(); | ||
assert_eq!(pool.count(), pool_size - 1); | ||
let discriminant_stale = pool.discriminant(); | ||
pool.set_discriminant(pool.discriminant() + 1); | ||
pool.return_resource(resource_item.into_inner(), discriminant_stale); | ||
|
||
assert_eq!(pool.count(), pool_size - 1); | ||
} | ||
} |