From c7524f2624d4de87b9569ee35c5afb0bc31b6f9b Mon Sep 17 00:00:00 2001 From: John Nunley Date: Mon, 7 Aug 2023 19:49:57 -0700 Subject: [PATCH] Also modify semaphore and once_cell Signed-off-by: John Nunley --- src/once_cell.rs | 8 ++++---- src/semaphore.rs | 35 +++++++++++++++-------------------- 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/src/once_cell.rs b/src/once_cell.rs index 9a2a0ab..42348f8 100644 --- a/src/once_cell.rs +++ b/src/once_cell.rs @@ -85,12 +85,15 @@ pub struct OnceCell { /// /// These are the users of get_or_init() and similar functions. active_initializers: Event, + /// Listeners waiting for the cell to be initialized. /// /// These are the users of wait(). passive_waiters: Event, + /// State associated with the cell. state: AtomicUsize, + /// The value of the cell. value: UnsafeCell>, } @@ -583,7 +586,6 @@ impl OnceCell { ) -> Result<(), E> { // The event listener we're currently waiting on. let event_listener = EventListener::new(&self.active_initializers); - let mut listening = false; pin!(event_listener); let mut closure = Some(closure); @@ -603,12 +605,10 @@ impl OnceCell { // but we do not have the ability to initialize it. // // We need to wait the initialization to complete. - if listening { + if event_listener.is_listening() { strategy.wait(event_listener.as_mut()).await; - listening = false; } else { event_listener.as_mut().listen(); - listening = true; } } State::Uninitialized => { diff --git a/src/semaphore.rs b/src/semaphore.rs index e1fe6df..6d0690e 100644 --- a/src/semaphore.rs +++ b/src/semaphore.rs @@ -86,7 +86,7 @@ impl Semaphore { pub fn acquire(&self) -> Acquire<'_> { Acquire { semaphore: self, - listener: None, + listener: EventListener::new(&self.event), } } @@ -176,15 +176,16 @@ impl Semaphore { } } -/// The future returned by [`Semaphore::acquire`]. -pub struct Acquire<'a> { - /// The semaphore being acquired. - semaphore: &'a Semaphore, +pin_project_lite::pin_project! { + /// The future returned by [`Semaphore::acquire`]. + pub struct Acquire<'a> { + // The semaphore being acquired. + semaphore: &'a Semaphore, - /// The listener waiting on the semaphore. - /// - /// TODO: At the next breaking release, remove the `Pin>` and make this type `!Unpin`. - listener: Option>>, + // The listener waiting on the semaphore. + #[pin] + listener: EventListener, + } } impl fmt::Debug for Acquire<'_> { @@ -193,27 +194,21 @@ impl fmt::Debug for Acquire<'_> { } } -impl Unpin for Acquire<'_> {} - impl<'a> Future for Acquire<'a> { type Output = SemaphoreGuard<'a>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let mut this = self.project(); loop { match this.semaphore.try_acquire() { Some(guard) => return Poll::Ready(guard), None => { // Wait on the listener. - match &mut this.listener { - None => { - this.listener = Some(this.semaphore.event.listen()); - } - Some(ref mut listener) => { - ready!(Pin::new(listener).poll(cx)); - this.listener = None; - } + if !this.listener.is_listening() { + this.listener.as_mut().listen(); + } else { + ready!(this.listener.as_mut().poll(cx)); } } }