Skip to content

Commit

Permalink
Also modify semaphore and once_cell
Browse files Browse the repository at this point in the history
Signed-off-by: John Nunley <dev@notgull.net>
  • Loading branch information
notgull committed Aug 8, 2023
1 parent c8eace8 commit c7524f2
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 24 deletions.
8 changes: 4 additions & 4 deletions src/once_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,15 @@ pub struct OnceCell<T> {
///
/// These are the users of get_or_init() and similar functions.
active_initializers: Event,

/// Listeners waiting for the cell to be initialized.
///
/// These are the users of wait().
passive_waiters: Event,

/// State associated with the cell.
state: AtomicUsize,

/// The value of the cell.
value: UnsafeCell<MaybeUninit<T>>,
}
Expand Down Expand Up @@ -583,7 +586,6 @@ impl<T> OnceCell<T> {
) -> Result<(), E> {
// The event listener we're currently waiting on.
let event_listener = EventListener::new(&self.active_initializers);
let mut listening = false;
pin!(event_listener);

let mut closure = Some(closure);
Expand All @@ -603,12 +605,10 @@ impl<T> OnceCell<T> {
// but we do not have the ability to initialize it.
//
// We need to wait the initialization to complete.
if listening {
if event_listener.is_listening() {
strategy.wait(event_listener.as_mut()).await;
listening = false;
} else {
event_listener.as_mut().listen();
listening = true;
}
}
State::Uninitialized => {
Expand Down
35 changes: 15 additions & 20 deletions src/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Semaphore {
pub fn acquire(&self) -> Acquire<'_> {
Acquire {
semaphore: self,
listener: None,
listener: EventListener::new(&self.event),
}
}

Expand Down Expand Up @@ -176,15 +176,16 @@ impl Semaphore {
}
}

/// The future returned by [`Semaphore::acquire`].
pub struct Acquire<'a> {
/// The semaphore being acquired.
semaphore: &'a Semaphore,
pin_project_lite::pin_project! {
/// The future returned by [`Semaphore::acquire`].
pub struct Acquire<'a> {
// The semaphore being acquired.
semaphore: &'a Semaphore,

/// The listener waiting on the semaphore.
///
/// TODO: At the next breaking release, remove the `Pin<Box<>>` and make this type `!Unpin`.
listener: Option<Pin<Box<EventListener>>>,
// The listener waiting on the semaphore.
#[pin]
listener: EventListener,
}
}

impl fmt::Debug for Acquire<'_> {
Expand All @@ -193,27 +194,21 @@ impl fmt::Debug for Acquire<'_> {
}
}

impl Unpin for Acquire<'_> {}

impl<'a> Future for Acquire<'a> {
type Output = SemaphoreGuard<'a>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut this = self.project();

loop {
match this.semaphore.try_acquire() {
Some(guard) => return Poll::Ready(guard),
None => {
// Wait on the listener.
match &mut this.listener {
None => {
this.listener = Some(this.semaphore.event.listen());
}
Some(ref mut listener) => {
ready!(Pin::new(listener).poll(cx));
this.listener = None;
}
if !this.listener.is_listening() {
this.listener.as_mut().listen();
} else {
ready!(this.listener.as_mut().poll(cx));
}
}
}
Expand Down

0 comments on commit c7524f2

Please sign in to comment.