From c4b70537812bf1a9016239a2480853c0424773ac Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 30 Sep 2024 12:48:47 -0500 Subject: [PATCH] Revert "reduce" This reverts commit 39746b627abc345081774c03c4a5acd5466a98f5. --- src/aiohappyeyeballs/_staggered.py | 32 ++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/aiohappyeyeballs/_staggered.py b/src/aiohappyeyeballs/_staggered.py index 85e4189..516d899 100644 --- a/src/aiohappyeyeballs/_staggered.py +++ b/src/aiohappyeyeballs/_staggered.py @@ -2,6 +2,7 @@ import contextlib from typing import ( TYPE_CHECKING, + Any, Awaitable, Callable, Iterable, @@ -22,6 +23,27 @@ def _set_result(wait_next: "asyncio.Future[None]") -> None: wait_next.set_result(None) +async def _wait_one( + futures: "Iterable[asyncio.Future[Any]]", + loop: asyncio.AbstractEventLoop, +) -> _T: + """Wait for the first future to complete.""" + wait_next = loop.create_future() + + def _on_completion(fut: "asyncio.Future[Any]") -> None: + if not wait_next.done(): + wait_next.set_result(fut) + + for f in futures: + f.add_done_callback(_on_completion) + + try: + return await wait_next + finally: + for f in futures: + f.remove_done_callback(_on_completion) + + async def staggered_race( coro_fns: Iterable[Callable[[], Awaitable[_T]]], delay: Optional[float], @@ -116,9 +138,6 @@ async def run_one_coro( start_next: Optional[asyncio.Future[None]] task: asyncio.Task[Optional[Tuple[_T, int]]] done: Union[asyncio.Future[None], asyncio.Task[Optional[Tuple[_T, int]]]] - waiters: Iterable[ - Union[asyncio.Future[None], asyncio.Task[Optional[Tuple[_T, int]]]] - ] coro_iter = iter(coro_fns) this_index = -1 try: @@ -136,12 +155,9 @@ async def run_one_coro( break while tasks: - waiters = [*tasks, start_next] if start_next else tasks - dones, _ = await asyncio.wait( - waiters, - return_when=asyncio.FIRST_COMPLETED, + done = await _wait_one( + [*tasks, start_next] if start_next else tasks, loop ) - done = dones.pop() if done is start_next: # The current task has failed or the timer has expired # so we need to start the next task.