Skip to content

Use a thread pool when installing dependencies#15

Open
blambright wants to merge 2 commits intothemill:masterfrom
blambright:master
Open

Use a thread pool when installing dependencies#15
blambright wants to merge 2 commits intothemill:masterfrom
blambright:master

Conversation

@blambright
Copy link

No description provided.

Copy link
Contributor

@buddly27 buddly27 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR Brett! I'm not very familiar with the ThreadPool, but reading your code made me want to use it more often :)

I just think the initializer needs to be in a different function for better separation of concern, but otherwise I like the logic. Just a few comment to address, and unit test to update. Integration & system tests are passing so this is a good sign!

installed_requests.add(request)
mutex = multiprocessing.Lock()

def _worker_execute(first=False, _overwrite=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to get this function out of the install() function for better readability. Maybe it should be renamed to something like _install_in_thread() and take the queue as an argument? Also renaming _install() to _install_single() will help making sense of all of this. It would be something like:

def install(requests, output_path, ...):
    """Install packages to *output_path* from *requests*."""
    ...

def _install_in_thread(queue, ...):
    """Install package from requests and update queue."""
    ...

def _install_single(request, output_path, context_mapping, definition_mapping, ...):
    """Install single package to *output_path* from *request*."""
    ...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Jeremy! I was hoping you would see my Teams message before seeing this pull request. This was more just a proof of concept rather than a legit implementation. I'm fine with any changes like this.

):
try:
# Keep alive if queue is empty but other installs are active.
if queue.empty():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that condition necessary? Why keeping one thread alive if other threads are active?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically I needed a way to keep all threads alive and waiting even if there was only one install active. Once the initializer function exits then the thread is gone. Since a library's dependencies only get added to the queue after a pip install, this would cause all of the threads to exit because the queue is empty however won't be empty soon. Basically what was happening was sometimes all but one thread would exit leaving only one thread remaining to do the rest of the installs in a linear fashion.

time.sleep(0.5)
continue

with mutex:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure that I understand properly, the active_count value ensure that the process doesn't finish accidentally after popping up the last item from the queue before processing it, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The active_count was a way to communicate between all the threads to stay alive until all installs have finished. The queue grows slowly causing it to go empty at times

stderr=subprocess.PIPE,
env=environ_mapping
env=environ_mapping,
close_fds=True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that necessary? Is that because you risk a "Too many open files" error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason the output parsing that grabs the request from the stdout would fail. Basically the stdout would be empty even the pip install was successful. I've had something like this happen to me before and I knew that close_fds=True would fix it. If you remove it, you'll see what happens.

queue.put((request, None, editable_mode))

# Install the initial requests and collect dependencies.
overwrite = _worker_execute(first=True, _overwrite=overwrite)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be possible to avoid calling the _worker_execute twice? What if you fill up the queue and pass it to the ThreadPool initializer?

queue = six.moves.queue.Queue()

for request in requests:
    queue.put((request, None, editable_mode))

pool = multiprocessing.pool.ThreadPool(
    THREAD_COUNT, _worker_execute,
    (queue, overwrite if overwrite is not None else False)
)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solved two things:

  1. Calling the initial requests on the main thread would allow a user to select from the overwrite prompt if it shows up. Then the "yes to all" would be respected for the dependencies.
  2. The queue would be full with dependencies by the time the thread pool started. I think that was only an issue before I added the active_count variable.

# Use a thread pool to install remaining requests.
pool = multiprocessing.pool.ThreadPool(
THREAD_COUNT, _worker_execute,
(False, overwrite if overwrite is not None else False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be safer to have "overwrite" being a multiprocessing.Value instance? So you could just mutate it in a thread without having to return it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that sounds fine

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants