Use a thread pool when installing dependencies#15
Use a thread pool when installing dependencies#15blambright wants to merge 2 commits intothemill:masterfrom
Conversation
buddly27
left a comment
There was a problem hiding this comment.
Thanks for the PR Brett! I'm not very familiar with the ThreadPool, but reading your code made me want to use it more often :)
I just think the initializer needs to be in a different function for better separation of concern, but otherwise I like the logic. Just a few comment to address, and unit test to update. Integration & system tests are passing so this is a good sign!
| installed_requests.add(request) | ||
| mutex = multiprocessing.Lock() | ||
|
|
||
| def _worker_execute(first=False, _overwrite=False): |
There was a problem hiding this comment.
I think it would be better to get this function out of the install() function for better readability. Maybe it should be renamed to something like _install_in_thread() and take the queue as an argument? Also renaming _install() to _install_single() will help making sense of all of this. It would be something like:
def install(requests, output_path, ...):
"""Install packages to *output_path* from *requests*."""
...
def _install_in_thread(queue, ...):
"""Install package from requests and update queue."""
...
def _install_single(request, output_path, context_mapping, definition_mapping, ...):
"""Install single package to *output_path* from *request*."""
...There was a problem hiding this comment.
Hey Jeremy! I was hoping you would see my Teams message before seeing this pull request. This was more just a proof of concept rather than a legit implementation. I'm fine with any changes like this.
| ): | ||
| try: | ||
| # Keep alive if queue is empty but other installs are active. | ||
| if queue.empty(): |
There was a problem hiding this comment.
Is that condition necessary? Why keeping one thread alive if other threads are active?
There was a problem hiding this comment.
Basically I needed a way to keep all threads alive and waiting even if there was only one install active. Once the initializer function exits then the thread is gone. Since a library's dependencies only get added to the queue after a pip install, this would cause all of the threads to exit because the queue is empty however won't be empty soon. Basically what was happening was sometimes all but one thread would exit leaving only one thread remaining to do the rest of the installs in a linear fashion.
| time.sleep(0.5) | ||
| continue | ||
|
|
||
| with mutex: |
There was a problem hiding this comment.
Just to be sure that I understand properly, the active_count value ensure that the process doesn't finish accidentally after popping up the last item from the queue before processing it, right?
There was a problem hiding this comment.
The active_count was a way to communicate between all the threads to stay alive until all installs have finished. The queue grows slowly causing it to go empty at times
| stderr=subprocess.PIPE, | ||
| env=environ_mapping | ||
| env=environ_mapping, | ||
| close_fds=True |
There was a problem hiding this comment.
Why is that necessary? Is that because you risk a "Too many open files" error?
There was a problem hiding this comment.
For some reason the output parsing that grabs the request from the stdout would fail. Basically the stdout would be empty even the pip install was successful. I've had something like this happen to me before and I knew that close_fds=True would fix it. If you remove it, you'll see what happens.
| queue.put((request, None, editable_mode)) | ||
|
|
||
| # Install the initial requests and collect dependencies. | ||
| overwrite = _worker_execute(first=True, _overwrite=overwrite) |
There was a problem hiding this comment.
Do you think it would be possible to avoid calling the _worker_execute twice? What if you fill up the queue and pass it to the ThreadPool initializer?
queue = six.moves.queue.Queue()
for request in requests:
queue.put((request, None, editable_mode))
pool = multiprocessing.pool.ThreadPool(
THREAD_COUNT, _worker_execute,
(queue, overwrite if overwrite is not None else False)
)There was a problem hiding this comment.
This solved two things:
- Calling the initial requests on the main thread would allow a user to select from the overwrite prompt if it shows up. Then the "yes to all" would be respected for the dependencies.
- The queue would be full with dependencies by the time the thread pool started. I think that was only an issue before I added the active_count variable.
| # Use a thread pool to install remaining requests. | ||
| pool = multiprocessing.pool.ThreadPool( | ||
| THREAD_COUNT, _worker_execute, | ||
| (False, overwrite if overwrite is not None else False) |
There was a problem hiding this comment.
Wouldn't it be safer to have "overwrite" being a multiprocessing.Value instance? So you could just mutate it in a thread without having to return it
No description provided.