Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blocking progress mode to Python async #116

Merged
merged 39 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6dae328
Expose `ucxx::Worker` epoll file descriptor getter
pentschev Nov 2, 2023
155ab58
Add blocking progress mode to Python async
pentschev Nov 2, 2023
1c1ab87
Test blocking mode in CI
pentschev Nov 2, 2023
e5f4a40
Disable Python future on `blocking` mode testing
pentschev Nov 6, 2023
7952ef1
Merge remote-tracking branch 'upstream/branch-0.36' into python-async…
pentschev Jan 15, 2024
edd3192
Add timeout to Python's async blocking progress mode
pentschev Jan 17, 2024
e3f9cc3
Support blocking mode in 'send_recv` Python benchmark
pentschev Jan 17, 2024
b5f95f0
Schedule cancelation in `ProgressTask` deleter
pentschev Jan 17, 2024
60e49d1
Rerun CI
pentschev Jan 17, 2024
91ab7bf
Revert accidental CI script changes
pentschev Jan 17, 2024
26480a5
Disable blocking progress mode delayed submission benchmarks
pentschev Jan 18, 2024
6da6c5b
Merge remote-tracking branch 'upstream/branch-0.36' into python-async…
pentschev Jan 18, 2024
80d7e14
Merge remote-tracking branch 'upstream/branch-0.36' into python-async…
pentschev Jan 30, 2024
2b7c4cf
Merge remote-tracking branch 'upstream/branch-0.36' into python-async…
pentschev Jan 30, 2024
77b3659
Remove `pytest.mark.gpu`
pentschev Jan 30, 2024
1e3c55e
Merge branch 'branch-0.40' into python-async-blocking-mode
pentschev Jul 25, 2024
755bc97
Merge remote-tracking branch 'origin/python-async-blocking-mode' into…
pentschev Jul 25, 2024
e22b227
Re-enable blocking Distributed tests in CI
pentschev Jul 25, 2024
c6f9654
Merge branch 'branch-0.40' into python-async-blocking-mode
pentschev Jul 25, 2024
d4a8795
Merge remote-tracking branch 'upstream/branch-0.40' into python-async…
pentschev Sep 10, 2024
71d3697
Merge remote-tracking branch 'origin/python-async-blocking-mode' into…
pentschev Sep 10, 2024
c800ca4
Fix progress timeout and docstrings
pentschev Sep 30, 2024
18e3cf0
Cancel progress tasks before closing of event loop
pentschev Sep 30, 2024
2ffac66
Merge remote-tracking branch 'upstream/branch-0.41' into python-async…
pentschev Sep 30, 2024
c5c2ceb
Use `partial` to rewrite `event_loop.close`
pentschev Oct 1, 2024
279cb4c
Reset `self.asyncio_task` to `None` after cancellation
pentschev Oct 1, 2024
c624898
Fix comments' phrasing
pentschev Oct 1, 2024
5769f31
Cancel `_arm_worker` instead of `sock_recv`
pentschev Oct 1, 2024
4f7a7f2
Merge branch 'branch-0.41' into python-async-blocking-mode
pentschev Oct 4, 2024
23bb0bb
Merge branch 'branch-0.41' into python-async-blocking-mode
pentschev Oct 4, 2024
01cbe8a
Merge branch 'branch-0.41' into python-async-blocking-mode
pentschev Oct 7, 2024
50d3f47
Merge remote-tracking branch 'upstream/branch-0.41' into python-async…
pentschev Oct 18, 2024
dbb6386
Revert "Resolve thread-safety issues in distributed-ucxx (#295)"
pentschev Oct 18, 2024
8e4bc38
Adjust properties and blocking progress mode initialization
pentschev Oct 22, 2024
a806e45
Merge remote-tracking branch 'origin/python-async-blocking-mode' into…
pentschev Oct 22, 2024
9e2d017
Fix unreachable test
pentschev Oct 22, 2024
94d05e9
Merge branch 'branch-0.41' into python-async-blocking-mode
pentschev Oct 22, 2024
caf67f9
Ensure writer is closed to prevent Distributed check failure
pentschev Oct 22, 2024
74913fa
Merge remote-tracking branch 'origin/python-async-blocking-mode' into…
pentschev Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,28 @@ rapids-logger "Python Async Tests"
# run_tests_async PROGRESS_MODE ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE SKIP
run_tests_async thread 0 0 0
run_tests_async thread 1 1 0
run_tests_async blocking 0 0 0

rapids-logger "Python Benchmarks"
# run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW
run_py_benchmark ucxx-core thread 0 0 0 1 0
run_py_benchmark ucxx-core thread 1 0 0 1 0

for nbuf in 1 8; do
if [[ ! $RAPIDS_CUDA_VERSION =~ 11.2.* ]]; then
# run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW
run_py_benchmark ucxx-async thread 0 0 0 ${nbuf} 0
run_py_benchmark ucxx-async thread 0 0 1 ${nbuf} 0
run_py_benchmark ucxx-async thread 0 1 0 ${nbuf} 0
run_py_benchmark ucxx-async thread 0 1 1 ${nbuf} 0
fi
for progress_mode in "blocking" "thread"; do
for nbuf in 1 8; do
if [[ ! $RAPIDS_CUDA_VERSION =~ 11.2.* ]]; then
# run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW
run_py_benchmark ucxx-async ${progress_mode} 0 0 0 ${nbuf} 0
run_py_benchmark ucxx-async ${progress_mode} 0 0 1 ${nbuf} 0
run_py_benchmark ucxx-async ${progress_mode} 0 1 0 ${nbuf} 0
run_py_benchmark ucxx-async ${progress_mode} 0 1 1 ${nbuf} 0
fi
done
done

rapids-logger "Distributed Tests"
# run_distributed_ucxx_tests PROGRESS_MODE ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE
run_distributed_ucxx_tests blocking 0 0
run_distributed_ucxx_tests polling 0 0
run_distributed_ucxx_tests thread 0 0
run_distributed_ucxx_tests thread 0 1
Expand Down
17 changes: 17 additions & 0 deletions cpp/include/ucxx/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,23 @@ class Worker : public Component {
*/
void initBlockingProgressMode();

/**
* @brief Get the epoll file descriptor associated with the worker.
*
* Get the epoll file descriptor associated with the worker when running in blocking mode.
* The worker only has an associated epoll file descriptor after
* `initBlockingProgressMode()` is executed.
*
* The file descriptor is destroyed as part of the `ucxx::Worker` destructor, thus any
* reference to it shall not be used after that.
wence- marked this conversation as resolved.
Show resolved Hide resolved
*
* @throws std::runtime_error if `initBlockingProgressMode()` was not executed to run the
* worker in blocking progress mode.
*
* @returns the file descriptor.
*/
int getEpollFileDescriptor();

/**
* @brief Arm the UCP worker.
*
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ void Worker::initBlockingProgressMode()
if (err != 0) throw std::ios_base::failure(std::string("epoll_ctl() returned " + err));
}

int Worker::getEpollFileDescriptor()
{
if (_epollFileDescriptor == 0)
throw std::runtime_error("Worker not running in blocking progress mode");

return _epollFileDescriptor;
}

bool Worker::arm()
{
ucs_status_t status = ucp_worker_arm(_handle);
Expand Down
17 changes: 17 additions & 0 deletions python/ucxx/_lib/libucxx.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,23 @@ cdef class UCXWorker():
with nogil:
self._worker.get().initBlockingProgressMode()

def arm(self):
cdef bint armed

with nogil:
armed = self._worker.get().arm()

return armed

@property
def epoll_file_descriptor(self):
cdef int epoll_file_descriptor = 0

with nogil:
epoll_file_descriptor = self._worker.get().getEpollFileDescriptor()

return epoll_file_descriptor

def progress(self):
with nogil:
self._worker.get().progress()
Expand Down
2 changes: 2 additions & 0 deletions python/ucxx/_lib/ucxx_api.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ cdef extern from "<ucxx/api.h>" namespace "ucxx" nogil:
uint16_t port, ucp_listener_conn_callback_t callback, void *callback_args
) except +raise_py_error
void initBlockingProgressMode() except +raise_py_error
int getEpollFileDescriptor()
bint arm() except +raise_py_error
void progress()
bint progressOnce()
void progressWorkerEvent(int epoll_timeout)
Expand Down
11 changes: 8 additions & 3 deletions python/ucxx/_lib_async/application_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ucxx._lib.arr import Array
from ucxx.exceptions import UCXMessageTruncatedError

from .continuous_ucx_progress import PollingMode, ThreadMode
from .continuous_ucx_progress import BlockingMode, PollingMode, ThreadMode
from .endpoint import Endpoint
from .exchange_peer_info import exchange_peer_info
from .listener import ActiveClients, Listener, _listener_handler
Expand Down Expand Up @@ -60,6 +60,9 @@ def __init__(
enable_python_future=enable_python_future,
)

if self.progress_mode == "blocking":
self.worker.init_blocking_progress_mode()

self.start_notifier_thread()

weakref.finalize(self, self.progress_tasks.clear)
Expand All @@ -77,7 +80,7 @@ def _check_progress_mode(progress_mode):
else:
progress_mode = "thread"

valid_progress_modes = ["polling", "thread", "thread-polling"]
valid_progress_modes = ["blocking", "polling", "thread", "thread-polling"]
if not isinstance(progress_mode, str) or not any(
progress_mode == m for m in valid_progress_modes
):
Expand Down Expand Up @@ -107,7 +110,7 @@ def _check_enable_delayed_submission(enable_delayed_submission, progress_mode):
and explicit_enable_delayed_submission
):
raise ValueError(
f"Delayed submission requested, but {progress_mode} does not "
f"Delayed submission requested, but '{progress_mode}' mode does not "
"support it, 'thread' or 'thread-polling' progress mode required."
)

Expand Down Expand Up @@ -400,6 +403,8 @@ def continuous_ucx_progress(self, event_loop=None):
task = ThreadMode(self.worker, loop, polling_mode=True)
elif self.progress_mode == "polling":
task = PollingMode(self.worker, loop)
elif self.progress_mode == "blocking":
task = BlockingMode(self.worker, loop, self.worker.epoll_file_descriptor)

self.progress_tasks.append(task)

Expand Down
49 changes: 49 additions & 0 deletions python/ucxx/_lib_async/continuous_ucx_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@


import asyncio
import socket
import weakref


class ProgressTask(object):
Expand Down Expand Up @@ -70,3 +72,50 @@ async def _progress_task(self):
worker.progress()
# Give other co-routines a chance to run.
await asyncio.sleep(0)


class BlockingMode(ProgressTask):
def __init__(self, worker, event_loop, epoll_fd):
super().__init__(worker, event_loop)

# Creating a job that is ready straightaway but with low priority.
# Calling `await self.event_loop.sock_recv(self.rsock, 1)` will
# return when all non-IO tasks are finished.
# See <https://stackoverflow.com/a/48491563>.
wence- marked this conversation as resolved.
Show resolved Hide resolved
self.rsock, wsock = socket.socketpair()
self.rsock.setblocking(0)
wsock.setblocking(0)
wsock.close()

# Bind an asyncio reader to a UCX epoll file descripter
event_loop.add_reader(epoll_fd, self._fd_reader_callback)

# Remove the reader and close socket on finalization
weakref.finalize(self, event_loop.remove_reader, epoll_fd)
weakref.finalize(self, self.rsock.close)

def _fd_reader_callback(self):
self.worker.progress()

# Notice, we can safely overwrite `self.dangling_arm_task`
# since previous arm task is finished by now.
assert self.asyncio_task is None or self.asyncio_task.done()
self.asyncio_task = self.event_loop.create_task(self._arm_worker())

async def _arm_worker(self):
# When arming the worker, the following must be true:
# - No more progress in UCX (see doc of ucp_worker_arm())
# - All asyncio tasks that isn't waiting on UCX must be executed
# so that the asyncio's next state is epoll wait.
# See <https://github.com/rapidsai/ucx-py/issues/413>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I think I understand the constraint, I am not sure what it means for the "next state" to be epoll_wait. Surely there can be arbitrary non-ucx tasks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna be honest and say my understanding here is also a bit fuzzy, and as you noted yourself this "doesn't work" (except when it does), with the original being adapted from https://stackoverflow.com/a/48491563. In my understanding, what epoll_wait refers to here is the socket state, which is there solely to provide a mechanism to prevent asyncio from running out of "ready" tasks, so epoll_wait will ensure that if nothing useful happens in the event loop, asyncio will still be awaken at some point to allow the loop to reevaluate.

while True:
self.worker.progress()

# This IO task returns when all non-IO tasks are finished.
# Notice, we do NOT hold a reference to `worker` while waiting.
await self.event_loop.sock_recv(self.rsock, 1)

if self.worker.arm():
# At this point we know that asyncio's next state is
# epoll wait.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who does the epoll_wait call?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per my #116 (comment), I believe this is related to the sockets.

break
Loading