-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace multiprocessing.ProcessPoolExecutor with asyncio #464
base: master
Are you sure you want to change the base?
Conversation
@@ -74,6 +74,5 @@ def run_tests(command, timeout=None): | |||
asyncio.set_event_loop_policy( | |||
asyncio.WindowsProactorEventLoopPolicy()) | |||
|
|||
result = asyncio.get_event_loop().run_until_complete( | |||
_run_tests(command, timeout)) | |||
result = asyncio.run(_run_tests(command, timeout)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On my reading of the docs, using run
instead of get_event_loop().run_until_complete()
seems quite limiting. run
doesn't allow other event loops to be running on the thread. This is probably not an issue for us currently, but I could easily imagine CR using asyncio more in the future, so using run()
here feels like we're asking for trouble in the future for little or no short-term gain. Is there something I'm missing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to follow up on this, the docs for run
say:
This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once.
If nothing else, you're calling it in two places in this PR. This feels like something we need to address.
initargs=(config,)) as pool: | ||
|
||
# pylint: disable=W0511 | ||
# TODO: This is not optimal. The pending-work iterable could be millions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to keep this comment, or some version of it. We have a different version of this same problem in your implementation: we build a list of the size of the number of pending work items, something I'd eventually like to avoid. Until we fix this, I'd like to keep a reminder around to think about it.
on_task_complete(*result) | ||
|
||
tasks = [run_task(work_item) for work_item in pending_work] | ||
await asyncio.gather(*tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit concerned about memory usage here. tasks
is a list of the same size as the number of items in pending_work
, potentially millions or even billions of items. When we destructure it on the call to asyncio.gather()
, Python is going to create a complete copy of it into a tuple, essentially doubling the space we need for this function.
Is there some way we can tell asyncio to process everything lazily and report when they're all done?
work_item) | ||
on_task_complete(*result) | ||
|
||
tasks = [run_task(work_item) for work_item in pending_work] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this just pool.map()
? The docs make some claims about map()
achieving better performance by chunking the input; we should investigate that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I have replaced pending
list against tasks
list in memory. Where is the best ?
I think there are others problems:
- sqlite is not under asynio then pending_work generator may blocks all async jobs.
- Dead lock can occurs if we need to write result in db since pending_work db cursor is not ended.
@@ -97,21 +101,23 @@ class LocalExecutionEngine(ExecutionEngine): | |||
"The local-git execution engine." | |||
|
|||
def __call__(self, pending_work, config, on_task_complete): | |||
with multiprocessing.Pool( | |||
asyncio.run(self._execute_pending_works(pending_work, config, on_task_complete)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my other comments about asyncio.run
below.
It sounds like we're trying to solve two problems at once now. The first problem is to start processing jobs before all work-items were fetched from the database. The second is to avoid needing to have large memory allocations (e.g. the You mention an interesting point: sqlite is not using asyncio. Perhaps we can address both of these issues by using a database with an asyncio interface. Perhaps even aiosqlite. I would be very interested to see a solution like that, esp. if it can solve both of these problems. |
The main idea is this PR is to remove
pending = list(pending_work)
with asyncio version.This allows starting some jobs without waiting for all work items fetch ware done from database.