From d56af9453ac6b33a75446c6db1ea857cafdf8c85 Mon Sep 17 00:00:00 2001 From: Peter Byfield Date: Mon, 14 Apr 2025 15:54:28 +0200 Subject: [PATCH] Use joblib for more robust parallel import scanning https://joblib.readthedocs.io/en/stable/parallel.html Joblib takes some things for us. Relevant here: * Robust calculation for number of available CPUs. * Sequential calculation when n_jobs = 1. And likely other minor things I don't even understand. --- CHANGELOG.rst | 2 ++ pyproject.toml | 1 + src/grimp/application/usecases.py | 38 +++++++++----------- tests/functional/test_build_and_use_graph.py | 2 +- 4 files changed, 21 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c9600b81..e27655e7 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -5,6 +5,8 @@ Changelog Unreleased ---------- +* Use joblib for robust parallel import scanning. + 3.8 (2025-04-11) ---------------- diff --git a/pyproject.toml b/pyproject.toml index f8da8904..51abb5bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ authors = [ ] requires-python = ">=3.9" dependencies = [ + "joblib~=1.4", "typing-extensions>=3.10.0.0", ] classifiers = [ diff --git a/src/grimp/application/usecases.py b/src/grimp/application/usecases.py index e19bb6e8..00bb4001 100644 --- a/src/grimp/application/usecases.py +++ b/src/grimp/application/usecases.py @@ -3,9 +3,10 @@ """ from typing import Dict, Sequence, Set, Type, Union, cast, Iterable, Collection -import multiprocessing import math +import joblib # type: ignore + from ..application.ports import caching from ..application.ports.filesystem import AbstractFileSystem from ..application.ports.graph import ImportGraph @@ -21,7 +22,7 @@ class NotSupplied: # This is an arbitrary number, but setting it too low slows down our functional tests considerably. -MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING = 50 +MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPLE_PROCESSES = 64 def build_graph( @@ -228,7 +229,7 @@ def _create_chunks(module_files: Collection[ModuleFile]) -> tuple[tuple[ModuleFi module_files_tuple = tuple(module_files) number_of_module_files = len(module_files_tuple) - n_chunks = _decide_number_of_of_processes(number_of_module_files) + n_chunks = _decide_number_of_processes(number_of_module_files) chunk_size = math.ceil(number_of_module_files / n_chunks) return tuple( @@ -236,11 +237,11 @@ def _create_chunks(module_files: Collection[ModuleFile]) -> tuple[tuple[ModuleFi ) -def _decide_number_of_of_processes(number_of_module_files: int) -> int: - if number_of_module_files < MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING: - # Don't incur the overhead of multiprocessing. +def _decide_number_of_processes(number_of_module_files: int) -> int: + if number_of_module_files < MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPLE_PROCESSES: + # Don't incur the overhead of multiple processes. return 1 - return min(multiprocessing.cpu_count(), number_of_module_files) + return min(joblib.cpu_count(), number_of_module_files) def _scan_chunks( @@ -257,20 +258,15 @@ def _scan_chunks( ) number_of_processes = len(chunks) - if number_of_processes == 1: - # No need to spawn a process if there's only one chunk. - [chunk] = chunks - return _scan_chunk(import_scanner, exclude_type_checking_imports, chunk) - else: - with multiprocessing.Pool(number_of_processes) as pool: - imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {} - import_scanning_jobs = pool.starmap( - _scan_chunk, - [(import_scanner, exclude_type_checking_imports, chunk) for chunk in chunks], - ) - for chunk_imports_by_module_file in import_scanning_jobs: - imports_by_module_file.update(chunk_imports_by_module_file) - return imports_by_module_file + import_scanning_jobs = joblib.Parallel(n_jobs=number_of_processes)( + joblib.delayed(_scan_chunk)(import_scanner, exclude_type_checking_imports, chunk) + for chunk in chunks + ) + + imports_by_module_file = {} + for chunk_imports_by_module_file in import_scanning_jobs: + imports_by_module_file.update(chunk_imports_by_module_file) + return imports_by_module_file def _scan_chunk( diff --git a/tests/functional/test_build_and_use_graph.py b/tests/functional/test_build_and_use_graph.py index 76b728e7..1fb235c7 100644 --- a/tests/functional/test_build_and_use_graph.py +++ b/tests/functional/test_build_and_use_graph.py @@ -55,7 +55,7 @@ def test_modules(): } -@patch.object(usecases, "MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING", 0) +@patch.object(usecases, "MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPLE_PROCESSES", 0) def test_modules_multiprocessing(): """ This test runs relatively slowly, but it's important we cover the multiprocessing code.