Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blocking progress mode to Python async #116

Merged
merged 39 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6dae328
Expose `ucxx::Worker` epoll file descriptor getter
pentschev Nov 2, 2023
155ab58
Add blocking progress mode to Python async
pentschev Nov 2, 2023
1c1ab87
Test blocking mode in CI
pentschev Nov 2, 2023
e5f4a40
Disable Python future on `blocking` mode testing
pentschev Nov 6, 2023
7952ef1
Merge remote-tracking branch 'upstream/branch-0.36' into python-async…
pentschev Jan 15, 2024
edd3192
Add timeout to Python's async blocking progress mode
pentschev Jan 17, 2024
e3f9cc3
Support blocking mode in 'send_recv` Python benchmark
pentschev Jan 17, 2024
b5f95f0
Schedule cancelation in `ProgressTask` deleter
pentschev Jan 17, 2024
60e49d1
Rerun CI
pentschev Jan 17, 2024
91ab7bf
Revert accidental CI script changes
pentschev Jan 17, 2024
26480a5
Disable blocking progress mode delayed submission benchmarks
pentschev Jan 18, 2024
6da6c5b
Merge remote-tracking branch 'upstream/branch-0.36' into python-async…
pentschev Jan 18, 2024
80d7e14
Merge remote-tracking branch 'upstream/branch-0.36' into python-async…
pentschev Jan 30, 2024
2b7c4cf
Merge remote-tracking branch 'upstream/branch-0.36' into python-async…
pentschev Jan 30, 2024
77b3659
Remove `pytest.mark.gpu`
pentschev Jan 30, 2024
1e3c55e
Merge branch 'branch-0.40' into python-async-blocking-mode
pentschev Jul 25, 2024
755bc97
Merge remote-tracking branch 'origin/python-async-blocking-mode' into…
pentschev Jul 25, 2024
e22b227
Re-enable blocking Distributed tests in CI
pentschev Jul 25, 2024
c6f9654
Merge branch 'branch-0.40' into python-async-blocking-mode
pentschev Jul 25, 2024
d4a8795
Merge remote-tracking branch 'upstream/branch-0.40' into python-async…
pentschev Sep 10, 2024
71d3697
Merge remote-tracking branch 'origin/python-async-blocking-mode' into…
pentschev Sep 10, 2024
c800ca4
Fix progress timeout and docstrings
pentschev Sep 30, 2024
18e3cf0
Cancel progress tasks before closing of event loop
pentschev Sep 30, 2024
2ffac66
Merge remote-tracking branch 'upstream/branch-0.41' into python-async…
pentschev Sep 30, 2024
c5c2ceb
Use `partial` to rewrite `event_loop.close`
pentschev Oct 1, 2024
279cb4c
Reset `self.asyncio_task` to `None` after cancellation
pentschev Oct 1, 2024
c624898
Fix comments' phrasing
pentschev Oct 1, 2024
5769f31
Cancel `_arm_worker` instead of `sock_recv`
pentschev Oct 1, 2024
4f7a7f2
Merge branch 'branch-0.41' into python-async-blocking-mode
pentschev Oct 4, 2024
23bb0bb
Merge branch 'branch-0.41' into python-async-blocking-mode
pentschev Oct 4, 2024
01cbe8a
Merge branch 'branch-0.41' into python-async-blocking-mode
pentschev Oct 7, 2024
50d3f47
Merge remote-tracking branch 'upstream/branch-0.41' into python-async…
pentschev Oct 18, 2024
dbb6386
Revert "Resolve thread-safety issues in distributed-ucxx (#295)"
pentschev Oct 18, 2024
8e4bc38
Adjust properties and blocking progress mode initialization
pentschev Oct 22, 2024
a806e45
Merge remote-tracking branch 'origin/python-async-blocking-mode' into…
pentschev Oct 22, 2024
9e2d017
Fix unreachable test
pentschev Oct 22, 2024
94d05e9
Merge branch 'branch-0.41' into python-async-blocking-mode
pentschev Oct 22, 2024
caf67f9
Ensure writer is closed to prevent Distributed check failure
pentschev Oct 22, 2024
74913fa
Merge remote-tracking branch 'origin/python-async-blocking-mode' into…
pentschev Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,27 @@ rapids-logger "Python Async Tests"
# run_py_tests_async PROGRESS_MODE ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE SKIP
run_py_tests_async thread 0 0 0
run_py_tests_async thread 1 1 0
run_py_tests_async blocking 0 0 0

rapids-logger "Python Benchmarks"
# run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW
run_py_benchmark ucxx-core thread 0 0 0 1 0
run_py_benchmark ucxx-core thread 1 0 0 1 0

for nbuf in 1 8; do
if [[ ! $RAPIDS_CUDA_VERSION =~ 11.2.* ]]; then
# run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW
run_py_benchmark ucxx-async thread 0 0 0 ${nbuf} 0
run_py_benchmark ucxx-async thread 0 0 1 ${nbuf} 0
run_py_benchmark ucxx-async thread 0 1 0 ${nbuf} 0
run_py_benchmark ucxx-async thread 0 1 1 ${nbuf} 0
fi
for progress_mode in "blocking" "thread"; do
for nbuf in 1 8; do
if [[ ! $RAPIDS_CUDA_VERSION =~ 11.2.* ]]; then
# run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW
run_py_benchmark ucxx-async ${progress_mode} 0 0 0 ${nbuf} 0
run_py_benchmark ucxx-async ${progress_mode} 0 0 1 ${nbuf} 0
if [[ ${progress_mode} != "blocking" ]]; then
# Delayed submission isn't support by blocking progress mode
# run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW
run_py_benchmark ucxx-async ${progress_mode} 0 1 0 ${nbuf} 0
run_py_benchmark ucxx-async ${progress_mode} 0 1 1 ${nbuf} 0
fi
fi
done
done

rapids-logger "C++ future -> Python future notifier example"
Expand Down
2 changes: 2 additions & 0 deletions ci/test_python_distributed.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ print_ucx_config

rapids-logger "Run distributed-ucxx tests with conda package"
# run_distributed_ucxx_tests PROGRESS_MODE ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE
run_distributed_ucxx_tests blocking 0 0
run_distributed_ucxx_tests polling 0 0
run_distributed_ucxx_tests thread 0 0
run_distributed_ucxx_tests thread 0 1
Expand All @@ -42,6 +43,7 @@ run_distributed_ucxx_tests thread 1 1
install_distributed_dev_mode

# run_distributed_ucxx_tests_internal PROGRESS_MODE ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE
run_distributed_ucxx_tests_internal blocking 0 0
run_distributed_ucxx_tests_internal polling 0 0
run_distributed_ucxx_tests_internal thread 0 0
run_distributed_ucxx_tests_internal thread 0 1
Expand Down
17 changes: 17 additions & 0 deletions cpp/include/ucxx/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,23 @@ class Worker : public Component {
*/
void initBlockingProgressMode();

/**
* @brief Get the epoll file descriptor associated with the worker.
*
* Get the epoll file descriptor associated with the worker when running in blocking mode.
* The worker only has an associated epoll file descriptor after
* `initBlockingProgressMode()` is executed.
*
* The file descriptor is destroyed as part of the `ucxx::Worker` destructor, thus any
* reference to it shall not be used after that.
wence- marked this conversation as resolved.
Show resolved Hide resolved
*
* @throws std::runtime_error if `initBlockingProgressMode()` was not executed to run the
* worker in blocking progress mode.
*
* @returns the file descriptor.
*/
int getEpollFileDescriptor();

/**
* @brief Arm the UCP worker.
*
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ void Worker::initBlockingProgressMode()
}
}

int Worker::getEpollFileDescriptor()
{
if (_epollFileDescriptor == 0)
throw std::runtime_error("Worker not running in blocking progress mode");

return _epollFileDescriptor;
}

bool Worker::arm()
{
ucs_status_t status = ucp_worker_arm(_handle);
Expand Down
17 changes: 17 additions & 0 deletions python/ucxx/ucxx/_lib/libucxx.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,23 @@ cdef class UCXWorker():
with nogil:
self._worker.get().initBlockingProgressMode()

def arm(self) -> bool:
cdef bint armed

with nogil:
armed = self._worker.get().arm()

return armed

@property
def epoll_file_descriptor(self) -> int:
cdef int epoll_file_descriptor = 0

with nogil:
epoll_file_descriptor = self._worker.get().getEpollFileDescriptor()

return epoll_file_descriptor

def progress(self) -> None:
with nogil:
self._worker.get().progress()
Expand Down
2 changes: 2 additions & 0 deletions python/ucxx/ucxx/_lib/ucxx_api.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ cdef extern from "<ucxx/api.h>" namespace "ucxx" nogil:
uint16_t port, ucp_listener_conn_callback_t callback, void *callback_args
) except +raise_py_error
void initBlockingProgressMode() except +raise_py_error
int getEpollFileDescriptor()
bint arm() except +raise_py_error
void progress()
bint progressOnce()
void progressWorkerEvent(int epoll_timeout)
Expand Down
9 changes: 7 additions & 2 deletions python/ucxx/ucxx/_lib_async/application_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ucxx.exceptions import UCXMessageTruncatedError
from ucxx.types import Tag

from .continuous_ucx_progress import PollingMode, ThreadMode
from .continuous_ucx_progress import BlockingMode, PollingMode, ThreadMode
from .endpoint import Endpoint
from .exchange_peer_info import exchange_peer_info
from .listener import ActiveClients, Listener, _listener_handler
Expand Down Expand Up @@ -60,6 +60,9 @@ def __init__(
enable_python_future=self._enable_python_future,
)

if self.progress_mode == "blocking":
self.worker.init_blocking_progress_mode()

self.start_notifier_thread()

weakref.finalize(self, self.progress_tasks.clear)
Expand All @@ -82,7 +85,7 @@ def progress_mode(self, progress_mode):
else:
progress_mode = "thread"

valid_progress_modes = ["polling", "thread", "thread-polling"]
valid_progress_modes = ["blocking", "polling", "thread", "thread-polling"]
if not isinstance(progress_mode, str) or not any(
progress_mode == m for m in valid_progress_modes
):
Expand Down Expand Up @@ -464,6 +467,8 @@ def continuous_ucx_progress(self, event_loop=None):
task = ThreadMode(self.worker, loop, polling_mode=True)
elif self.progress_mode == "polling":
task = PollingMode(self.worker, loop)
elif self.progress_mode == "blocking":
task = BlockingMode(self.worker, loop)

self.progress_tasks[loop] = task

Expand Down
145 changes: 139 additions & 6 deletions python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@


import asyncio
import socket
import time
import weakref
from functools import partial

from ucxx._lib.libucxx import UCXWorker


class ProgressTask(object):
Expand All @@ -24,12 +30,20 @@ def __init__(self, worker, event_loop):
self.event_loop = event_loop
self.asyncio_task = None

def __del__(self):
if self.asyncio_task is not None:
# FIXME: This does not work, the cancellation must be awaited.
# Running with polling mode will always cause
# `Task was destroyed but it is pending!` errors at ucxx.reset().
self.asyncio_task.cancel()
event_loop_close_original = self.event_loop.close

def _event_loop_close(event_loop_close_original, *args, **kwargs):
if not self.event_loop.is_closed() and self.asyncio_task is not None:
try:
self.asyncio_task.cancel()
self.event_loop.run_until_complete(self.asyncio_task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: should we set self.asyncio_task = None after running until complete?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so the idea is that you don't have control over who is closing the event loop, so you instead hook into close and this task cancels itself during event loop closing/teardown?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: should we set self.asyncio_task = None after running until complete?

That's a good idea, done in 279cb4c .

OK, so the idea is that you don't have control over who is closing the event loop, so you instead hook into close and this task cancels itself during event loop closing/teardown?

Exactly, there doesn't seem to be another way, since we have no control of whether the user will close the loop before resetting UCXX.

except asyncio.exceptions.CancelledError:
pass
finally:
self.asyncio_task = None
event_loop_close_original(*args, **kwargs)

self.event_loop.close = partial(_event_loop_close, event_loop_close_original)

# Hash and equality is based on the event loop
def __hash__(self):
Expand Down Expand Up @@ -70,3 +84,122 @@ async def _progress_task(self):
worker.progress()
# Give other co-routines a chance to run.
await asyncio.sleep(0)


class BlockingMode(ProgressTask):
def __init__(
self,
worker: UCXWorker,
event_loop: asyncio.AbstractEventLoop,
progress_timeout: float = 1.0,
):
"""Progress the UCX worker in blocking mode.

The blocking progress mode ensure the worker is progresses whenever the
UCX worker reports an event on its epoll file descriptor. In certain
circumstances the epoll file descriptor may not present an event, thus
the `progress_timeout` will ensure the UCX worker is progressed to
prevent a potential deadlock.

Parameters
----------
worker: UCXWorker
Worker object from the UCXX Cython API to progress.
event_loop: asyncio.AbstractEventLoop
Asynchronous event loop where to schedule async tasks.
progress_timeout: float
The timeout to sleep until calling checking again whether the worker should
be progressed.
"""
super().__init__(worker, event_loop)
self._progress_timeout = progress_timeout

# Creating a job that is ready straight away but with low priority.
# Calling `await self.event_loop.sock_recv(self.rsock, 1)` will
# return when all non-IO tasks are finished.
# See <https://stackoverflow.com/a/48491563>.
self.rsock, wsock = socket.socketpair()
self.rsock.setblocking(0)
wsock.setblocking(0)
wsock.close()

epoll_fd = worker.epoll_file_descriptor

# Bind an asyncio reader to a UCX epoll file descriptor
event_loop.add_reader(epoll_fd, self._fd_reader_callback)

# Remove the reader and close socket on finalization
weakref.finalize(self, event_loop.remove_reader, epoll_fd)
weakref.finalize(self, self.rsock.close)

self.blocking_asyncio_task = None
self.last_progress_time = time.monotonic() - self._progress_timeout
self.asyncio_task = event_loop.create_task(self._progress_with_timeout())

def _fd_reader_callback(self):
"""Schedule new progress task upon worker event.

Schedule new progress task when a new event occurs in the worker's epoll file
descriptor.
"""
self.worker.progress()

# Notice, we can safely overwrite `self.blocking_asyncio_task`
# since previous arm task is finished by now.
assert self.blocking_asyncio_task is None or self.blocking_asyncio_task.done()
self.blocking_asyncio_task = self.event_loop.create_task(self._arm_worker())

async def _arm_worker(self):
"""Progress the worker and rearm.

Progress and rearm the worker to watch for new events on its epoll file
descriptor.
"""
# When arming the worker, the following must be true:
# - No more progress in UCX (see doc of ucp_worker_arm())
# - All asyncio tasks that aren't waiting on UCX must be executed
# so that the asyncio's next state is epoll wait.
# See <https://github.com/rapidsai/ucx-py/issues/413>
while True:
self.last_progress_time = time.monotonic()
self.worker.progress()

# This IO task returns when all non-IO tasks are finished.
# Notice, we do NOT hold a reference to `worker` while waiting.
await self.event_loop.sock_recv(self.rsock, 1)

if self.worker.arm():
# At this point we know that asyncio's next state is
# epoll wait.
break

async def _progress_with_timeout(self):
"""Protect worker from never progressing again.

To ensure the worker progresses if no events are raised and the asyncio loop
getting stuck we must ensure the worker is progressed every so often. This
method ensures the worker is progressed independent of what the epoll file
descriptor does if longer than `self._progress_timeout` has elapsed since
last check, thus preventing a deadlock.
"""
while True:
worker = self.worker
if worker is None:
return
if time.monotonic() > self.last_progress_time + self._progress_timeout:
self.last_progress_time = time.monotonic()

# Cancel `_arm_worker` task if available. `loop.sock_recv` does not
# seem to respect timeout with `asyncio.wait_for`, thus we cancel
# it here instead. It will get recreated after a new event on
# `worker.epoll_file_descriptor`.
if self.blocking_asyncio_task is not None:
self.blocking_asyncio_task.cancel()
try:
await self.blocking_asyncio_task
except asyncio.exceptions.CancelledError:
pass

worker.progress()
# Give other co-routines a chance to run.
await asyncio.sleep(self._progress_timeout)
6 changes: 2 additions & 4 deletions python/ucxx/ucxx/benchmarks/send_recv.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ def parse_args():
parser.add_argument(
"--progress-mode",
default="thread",
help="Progress mode for the UCP worker. Valid options are: "
"'thread' (default) and 'blocking'.",
help="Progress mode for the UCP worker. Valid options are: 'blocking, "
"'polling', 'thread' and 'thread-polling. (Default: 'thread')'",
type=str,
)
parser.add_argument(
Expand Down Expand Up @@ -350,8 +350,6 @@ def parse_args():

if args.progress_mode not in ["blocking", "polling", "thread", "thread-polling"]:
raise RuntimeError(f"Invalid `--progress-mode`: '{args.progress_mode}'")
if args.progress_mode == "blocking" and args.backend == "ucxx-async":
raise RuntimeError("Blocking progress mode not supported for ucxx-async yet")
if args.asyncio_wait and not args.progress_mode.startswith("thread"):
raise RuntimeError(
"`--asyncio-wait` requires `--progress-mode=thread` or "
Expand Down
Loading