Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Changelog
Unreleased
----------

* Use joblib for robust parallel import scanning.

3.8 (2025-04-11)
----------------

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ authors = [
]
requires-python = ">=3.9"
dependencies = [
"joblib~=1.4",
"typing-extensions>=3.10.0.0",
]
classifiers = [
Expand Down
38 changes: 17 additions & 21 deletions src/grimp/application/usecases.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
"""

from typing import Dict, Sequence, Set, Type, Union, cast, Iterable, Collection
import multiprocessing
import math

import joblib # type: ignore

from ..application.ports import caching
from ..application.ports.filesystem import AbstractFileSystem
from ..application.ports.graph import ImportGraph
Expand All @@ -21,7 +22,7 @@ class NotSupplied:


# This is an arbitrary number, but setting it too low slows down our functional tests considerably.
MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING = 50
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MULTIPLE_PROCESSES instead of MULTIPROCESSING to avoid confusion with multiprocessing module

MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPLE_PROCESSES = 64
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐼🐼🐼 My OCD => 64 is a power of two, just feels nicer

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we should include this change here, at least not without something in the changelog as it could have an impact on end users. On balance I'd prefer to concentrate on fixing the issue without making another change at the same time.



def build_graph(
Expand Down Expand Up @@ -228,19 +229,19 @@ def _create_chunks(module_files: Collection[ModuleFile]) -> tuple[tuple[ModuleFi
module_files_tuple = tuple(module_files)

number_of_module_files = len(module_files_tuple)
n_chunks = _decide_number_of_of_processes(number_of_module_files)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo (should really have put that in a separate commit...)

n_chunks = _decide_number_of_processes(number_of_module_files)
chunk_size = math.ceil(number_of_module_files / n_chunks)

return tuple(
module_files_tuple[i * chunk_size : (i + 1) * chunk_size] for i in range(n_chunks)
)


def _decide_number_of_of_processes(number_of_module_files: int) -> int:
if number_of_module_files < MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING:
# Don't incur the overhead of multiprocessing.
def _decide_number_of_processes(number_of_module_files: int) -> int:
if number_of_module_files < MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPLE_PROCESSES:
# Don't incur the overhead of multiple processes.
return 1
return min(multiprocessing.cpu_count(), number_of_module_files)
return min(joblib.cpu_count(), number_of_module_files)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



def _scan_chunks(
Expand All @@ -257,20 +258,15 @@ def _scan_chunks(
)

number_of_processes = len(chunks)
if number_of_processes == 1:
# No need to spawn a process if there's only one chunk.
[chunk] = chunks
return _scan_chunk(import_scanner, exclude_type_checking_imports, chunk)
Comment on lines -260 to -263
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this special number_of_processes == 1 case anymore - joblib will do this automatically. https://joblib.readthedocs.io/en/stable/generated/joblib.Parallel.html#joblib.Parallel

If 1 is given, no parallel computing code is used at all, and the behavior amounts to a simple python for loop.

else:
with multiprocessing.Pool(number_of_processes) as pool:
imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {}
import_scanning_jobs = pool.starmap(
_scan_chunk,
[(import_scanner, exclude_type_checking_imports, chunk) for chunk in chunks],
)
for chunk_imports_by_module_file in import_scanning_jobs:
imports_by_module_file.update(chunk_imports_by_module_file)
return imports_by_module_file
import_scanning_jobs = joblib.Parallel(n_jobs=number_of_processes)(
joblib.delayed(_scan_chunk)(import_scanner, exclude_type_checking_imports, chunk)
for chunk in chunks
)

imports_by_module_file = {}
for chunk_imports_by_module_file in import_scanning_jobs:
imports_by_module_file.update(chunk_imports_by_module_file)
return imports_by_module_file


def _scan_chunk(
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/test_build_and_use_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_modules():
}


@patch.object(usecases, "MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING", 0)
@patch.object(usecases, "MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPLE_PROCESSES", 0)
def test_modules_multiprocessing():
"""
This test runs relatively slowly, but it's important we cover the multiprocessing code.
Expand Down
Loading