From 65237bd31e99a71fe5707e7c20b36ec75e65c16a Mon Sep 17 00:00:00 2001 From: David Seddon Date: Fri, 11 Apr 2025 10:17:37 +0100 Subject: [PATCH 1/4] Move chunking decision inside _create_chunks --- src/grimp/application/usecases.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/grimp/application/usecases.py b/src/grimp/application/usecases.py index bb43d7a8..1f58357e 100644 --- a/src/grimp/application/usecases.py +++ b/src/grimp/application/usecases.py @@ -15,8 +15,6 @@ from ..domain.valueobjects import DirectImport, Module from .config import settings -N_CPUS = multiprocessing.cpu_count() - class NotSupplied: pass @@ -217,9 +215,8 @@ def _scan_imports( imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {} - n_chunks = min(N_CPUS, len(module_files)) - chunks = _create_chunks(list(module_files), n_chunks=n_chunks) - with multiprocessing.Pool(n_chunks) as pool: + chunks = _create_chunks(module_files) + with multiprocessing.Pool(len(chunks)) as pool: import_scanning_jobs = pool.starmap( _scan_chunk, [(import_scanner, exclude_type_checking_imports, chunk) for chunk in chunks], @@ -230,11 +227,23 @@ def _scan_imports( return imports_by_module_file -def _create_chunks( - module_files: Sequence[ModuleFile], *, n_chunks: int -) -> Iterable[Iterable[ModuleFile]]: - chunk_size = math.ceil(len(module_files) / n_chunks) - return [module_files[i * chunk_size : (i + 1) * chunk_size] for i in range(n_chunks)] +def _create_chunks(module_files: Collection[ModuleFile]) -> tuple[tuple[ModuleFile, ...], ...]: + """ + Split the module files into chunks, each to be worked on by a separate OS process. + """ + module_files_tuple = tuple(module_files) + + number_of_module_files = len(module_files_tuple) + n_chunks = _decide_number_of_of_processes(number_of_module_files) + chunk_size = math.ceil(number_of_module_files / n_chunks) + + return tuple( + module_files_tuple[i * chunk_size : (i + 1) * chunk_size] for i in range(n_chunks) + ) + + +def _decide_number_of_of_processes(number_of_module_files: int) -> int: + return min(multiprocessing.cpu_count(), number_of_module_files) def _scan_chunk( From aa705bc04e6e100d37d21896dd6aa90587bbd678 Mon Sep 17 00:00:00 2001 From: David Seddon Date: Fri, 11 Apr 2025 10:24:51 +0100 Subject: [PATCH 2/4] Extract _scan_chunks function --- src/grimp/application/usecases.py | 50 ++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/src/grimp/application/usecases.py b/src/grimp/application/usecases.py index 1f58357e..e2c8b41e 100644 --- a/src/grimp/application/usecases.py +++ b/src/grimp/application/usecases.py @@ -207,24 +207,14 @@ def _scan_imports( include_external_packages: bool, exclude_type_checking_imports: bool, ) -> Dict[ModuleFile, Set[DirectImport]]: - import_scanner: AbstractImportScanner = settings.IMPORT_SCANNER_CLASS( - file_system=file_system, - found_packages=found_packages, - include_external_packages=include_external_packages, - ) - - imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {} - chunks = _create_chunks(module_files) - with multiprocessing.Pool(len(chunks)) as pool: - import_scanning_jobs = pool.starmap( - _scan_chunk, - [(import_scanner, exclude_type_checking_imports, chunk) for chunk in chunks], - ) - for chunk_imports_by_module_file in import_scanning_jobs: - imports_by_module_file.update(chunk_imports_by_module_file) - - return imports_by_module_file + return _scan_chunks( + chunks, + file_system, + found_packages, + include_external_packages, + exclude_type_checking_imports, + ) def _create_chunks(module_files: Collection[ModuleFile]) -> tuple[tuple[ModuleFile, ...], ...]: @@ -246,6 +236,32 @@ def _decide_number_of_of_processes(number_of_module_files: int) -> int: return min(multiprocessing.cpu_count(), number_of_module_files) +def _scan_chunks( + chunks: Collection[Collection[ModuleFile]], + file_system: AbstractFileSystem, + found_packages: Set[FoundPackage], + include_external_packages: bool, + exclude_type_checking_imports: bool, +) -> Dict[ModuleFile, Set[DirectImport]]: + import_scanner: AbstractImportScanner = settings.IMPORT_SCANNER_CLASS( + file_system=file_system, + found_packages=found_packages, + include_external_packages=include_external_packages, + ) + + imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {} + + with multiprocessing.Pool(len(chunks)) as pool: + import_scanning_jobs = pool.starmap( + _scan_chunk, + [(import_scanner, exclude_type_checking_imports, chunk) for chunk in chunks], + ) + for chunk_imports_by_module_file in import_scanning_jobs: + imports_by_module_file.update(chunk_imports_by_module_file) + + return imports_by_module_file + + def _scan_chunk( import_scanner: AbstractImportScanner, exclude_type_checking_imports: bool, From e3fdbb4e8cf063eb7fc214ae1348baf8fb926d95 Mon Sep 17 00:00:00 2001 From: David Seddon Date: Fri, 11 Apr 2025 10:28:56 +0100 Subject: [PATCH 3/4] Don't spawn processes if there is only one chunk --- src/grimp/application/usecases.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/grimp/application/usecases.py b/src/grimp/application/usecases.py index e2c8b41e..a1511917 100644 --- a/src/grimp/application/usecases.py +++ b/src/grimp/application/usecases.py @@ -249,17 +249,21 @@ def _scan_chunks( include_external_packages=include_external_packages, ) - imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {} - - with multiprocessing.Pool(len(chunks)) as pool: - import_scanning_jobs = pool.starmap( - _scan_chunk, - [(import_scanner, exclude_type_checking_imports, chunk) for chunk in chunks], - ) - for chunk_imports_by_module_file in import_scanning_jobs: - imports_by_module_file.update(chunk_imports_by_module_file) - - return imports_by_module_file + number_of_processes = len(chunks) + if number_of_processes == 1: + # No need to spawn a process if there's only one chunk. + [chunk] = chunks + return _scan_chunk(import_scanner, exclude_type_checking_imports, chunk) + else: + with multiprocessing.Pool(number_of_processes) as pool: + imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {} + import_scanning_jobs = pool.starmap( + _scan_chunk, + [(import_scanner, exclude_type_checking_imports, chunk) for chunk in chunks], + ) + for chunk_imports_by_module_file in import_scanning_jobs: + imports_by_module_file.update(chunk_imports_by_module_file) + return imports_by_module_file def _scan_chunk( From 059a5d9498b2459c79c0c6b316aa22af4f626b2e Mon Sep 17 00:00:00 2001 From: David Seddon Date: Fri, 11 Apr 2025 12:11:44 +0100 Subject: [PATCH 4/4] Don't parallelize scanning for smaller codebases This significantly speeds up the test suite, which had slowed down when we added scanning parallelization. --- src/grimp/application/usecases.py | 7 +++++ tests/functional/test_build_and_use_graph.py | 29 ++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/src/grimp/application/usecases.py b/src/grimp/application/usecases.py index a1511917..e19bb6e8 100644 --- a/src/grimp/application/usecases.py +++ b/src/grimp/application/usecases.py @@ -20,6 +20,10 @@ class NotSupplied: pass +# This is an arbitrary number, but setting it too low slows down our functional tests considerably. +MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING = 50 + + def build_graph( package_name, *additional_package_names, @@ -233,6 +237,9 @@ def _create_chunks(module_files: Collection[ModuleFile]) -> tuple[tuple[ModuleFi def _decide_number_of_of_processes(number_of_module_files: int) -> int: + if number_of_module_files < MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING: + # Don't incur the overhead of multiprocessing. + return 1 return min(multiprocessing.cpu_count(), number_of_module_files) diff --git a/tests/functional/test_build_and_use_graph.py b/tests/functional/test_build_and_use_graph.py index ed2ce877..76b728e7 100644 --- a/tests/functional/test_build_and_use_graph.py +++ b/tests/functional/test_build_and_use_graph.py @@ -1,6 +1,8 @@ from grimp import build_graph from typing import Set, Tuple, Optional import pytest +from unittest.mock import patch +from grimp.application import usecases """ For ease of reference, these are the imports of all the files: @@ -53,6 +55,33 @@ def test_modules(): } +@patch.object(usecases, "MIN_NUMBER_OF_MODULES_TO_SCAN_USING_MULTIPROCESSING", 0) +def test_modules_multiprocessing(): + """ + This test runs relatively slowly, but it's important we cover the multiprocessing code. + """ + graph = build_graph("testpackage", cache_dir=None) + + assert graph.modules == { + "testpackage", + "testpackage.one", + "testpackage.one.alpha", + "testpackage.one.beta", + "testpackage.one.gamma", + "testpackage.one.delta", + "testpackage.one.delta.blue", + "testpackage.two", + "testpackage.two.alpha", + "testpackage.two.beta", + "testpackage.two.gamma", + "testpackage.utils", + "testpackage.three", + "testpackage.three.beta", + "testpackage.three.gamma", + "testpackage.three.alpha", + } + + def test_add_module(): graph = build_graph("testpackage", cache_dir=None) number_of_modules = len(graph.modules)