From b30acb127c29ff6de7315dd44871e723e468a974 Mon Sep 17 00:00:00 2001 From: Peter Byfield Date: Fri, 4 Apr 2025 10:14:50 +0200 Subject: [PATCH 1/7] Make SourceSyntaxError pickleable This is needed to ensure that the error can be sent between processes. This ensures that the test `test_syntax_error_includes_module` still passes after using multiprocessing. --- src/grimp/exceptions.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/grimp/exceptions.py b/src/grimp/exceptions.py index 7b6530ae..a6baa169 100644 --- a/src/grimp/exceptions.py +++ b/src/grimp/exceptions.py @@ -62,6 +62,11 @@ def __eq__(self, other): other.text, ) + def __reduce__(self): + # Implement __reduce__ to make this exception pickleable, + # allowing it to be sent between processes. + return SourceSyntaxError, (self.filename, self.lineno, self.text) + class InvalidModuleExpression(GrimpException): pass From b6564368887ad9054b31b7d263c93c7c2aaf6723 Mon Sep 17 00:00:00 2001 From: Peter Byfield Date: Fri, 4 Apr 2025 09:45:29 +0200 Subject: [PATCH 2/7] Separate cache lookup from import scanning This is helpful, to avoid having to pass the large cache object to the multiprocessing code. --- src/grimp/application/usecases.py | 36 +++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/src/grimp/application/usecases.py b/src/grimp/application/usecases.py index 3fc8e4a7..74b2e78d 100644 --- a/src/grimp/application/usecases.py +++ b/src/grimp/application/usecases.py @@ -7,7 +7,7 @@ from ..application.ports.filesystem import AbstractFileSystem from ..application.ports.graph import ImportGraph from ..application.ports.importscanner import AbstractImportScanner -from ..application.ports.modulefinder import AbstractModuleFinder, FoundPackage +from ..application.ports.modulefinder import AbstractModuleFinder, FoundPackage, ModuleFile from ..application.ports.packagefinder import AbstractPackageFinder from ..domain.valueobjects import DirectImport, Module from .config import settings @@ -106,7 +106,6 @@ def _scan_packages( exclude_type_checking_imports: bool, cache_dir: Union[str, Type[NotSupplied], None], ) -> Dict[Module, Set[DirectImport]]: - imports_by_module: Dict[Module, Set[DirectImport]] = {} if cache_dir is not None: cache_dir_if_supplied = cache_dir if cache_dir != NotSupplied else None cache: caching.Cache = settings.CACHE_CLASS.setup( @@ -122,18 +121,33 @@ def _scan_packages( include_external_packages=include_external_packages, ) - for found_package in found_packages: - for module_file in found_package.module_files: - module = module_file.module + module_files_to_scan = { + module_file + for found_package in found_packages + for module_file in found_package.module_files + } + imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {} + + if cache_dir is not None: + for module_file in module_files_to_scan: try: - if cache_dir is None: - raise caching.CacheMiss direct_imports = cache.read_imports(module_file) except caching.CacheMiss: - direct_imports = import_scanner.scan_for_imports( - module, exclude_type_checking_imports=exclude_type_checking_imports - ) - imports_by_module[module] = direct_imports + continue + else: + imports_by_module_file[module_file] = direct_imports + + remaining_module_files_to_scan = module_files_to_scan.difference(imports_by_module_file) + if remaining_module_files_to_scan: + # TODO Parallelise this part. + for module_file in remaining_module_files_to_scan: + imports_by_module_file[module_file] = import_scanner.scan_for_imports( + module_file.module, exclude_type_checking_imports=exclude_type_checking_imports + ) + + imports_by_module: Dict[Module, Set[DirectImport]] = { + k.module: v for k, v in imports_by_module_file.items() + } if cache_dir is not None: cache.write(imports_by_module) From b9444c03a8d751af709386906ba47a614e8da4af Mon Sep 17 00:00:00 2001 From: Peter Byfield Date: Sat, 5 Apr 2025 16:55:23 +0200 Subject: [PATCH 3/7] Break up _scan_packages into smaller functions --- src/grimp/application/usecases.py | 65 +++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/src/grimp/application/usecases.py b/src/grimp/application/usecases.py index 74b2e78d..acf93d03 100644 --- a/src/grimp/application/usecases.py +++ b/src/grimp/application/usecases.py @@ -1,7 +1,9 @@ """ Use cases handle application logic. """ -from typing import Dict, Sequence, Set, Type, Union, cast + +from typing import Dict, Sequence, Set, Type, Union, cast, Iterable + from ..application.ports import caching from ..application.ports.filesystem import AbstractFileSystem @@ -115,35 +117,29 @@ def _scan_packages( exclude_type_checking_imports=exclude_type_checking_imports, cache_dir=cache_dir_if_supplied, ) - import_scanner: AbstractImportScanner = settings.IMPORT_SCANNER_CLASS( - file_system=file_system, - found_packages=found_packages, - include_external_packages=include_external_packages, - ) module_files_to_scan = { module_file for found_package in found_packages for module_file in found_package.module_files } + imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {} if cache_dir is not None: - for module_file in module_files_to_scan: - try: - direct_imports = cache.read_imports(module_file) - except caching.CacheMiss: - continue - else: - imports_by_module_file[module_file] = direct_imports + imports_by_module_file.update(_read_imports_from_cache(module_files_to_scan, cache=cache)) remaining_module_files_to_scan = module_files_to_scan.difference(imports_by_module_file) if remaining_module_files_to_scan: - # TODO Parallelise this part. - for module_file in remaining_module_files_to_scan: - imports_by_module_file[module_file] = import_scanner.scan_for_imports( - module_file.module, exclude_type_checking_imports=exclude_type_checking_imports + imports_by_module_file.update( + _scan_imports( + remaining_module_files_to_scan, + file_system=file_system, + found_packages=found_packages, + include_external_packages=include_external_packages, + exclude_type_checking_imports=exclude_type_checking_imports, ) + ) imports_by_module: Dict[Module, Set[DirectImport]] = { k.module: v for k, v in imports_by_module_file.items() @@ -186,3 +182,38 @@ def _is_external(module: Module, found_packages: Set[FoundPackage]) -> bool: module.is_descendant_of(package_module) or module == package_module for package_module in package_modules ) + + +def _read_imports_from_cache( + module_files: Iterable[ModuleFile], *, cache: caching.Cache +) -> Dict[ModuleFile, Set[DirectImport]]: + imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {} + for module_file in module_files: + try: + direct_imports = cache.read_imports(module_file) + except caching.CacheMiss: + continue + else: + imports_by_module_file[module_file] = direct_imports + return imports_by_module_file + + +def _scan_imports( + module_files: Iterable[ModuleFile], + *, + file_system: AbstractFileSystem, + found_packages: Set[FoundPackage], + include_external_packages: bool, + exclude_type_checking_imports: bool, +) -> Dict[ModuleFile, Set[DirectImport]]: + import_scanner: AbstractImportScanner = settings.IMPORT_SCANNER_CLASS( + file_system=file_system, + found_packages=found_packages, + include_external_packages=include_external_packages, + ) + return { + module_file: import_scanner.scan_for_imports( + module_file.module, exclude_type_checking_imports=exclude_type_checking_imports + ) + for module_file in module_files + } From c16fc2c09f859784d3beae549186726907709888 Mon Sep 17 00:00:00 2001 From: Peter Byfield Date: Fri, 4 Apr 2025 09:50:14 +0200 Subject: [PATCH 4/7] Add joblib A nicer interface to multiprocessing See https://joblib.readthedocs.io/en/stable/parallel.html --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index e04d1a94..b5b8b888 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ authors = [ ] requires-python = ">=3.9" dependencies = [ + "joblib~=1.4", "typing-extensions>=3.10.0.0", ] classifiers = [ From fd97e47c6765436c47d3c836128fbc86ad8ec1bf Mon Sep 17 00:00:00 2001 From: Peter Byfield Date: Sat, 5 Apr 2025 16:58:31 +0200 Subject: [PATCH 5/7] Parallelise import scanning --- src/grimp/application/usecases.py | 48 +++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/src/grimp/application/usecases.py b/src/grimp/application/usecases.py index acf93d03..dde6fd3b 100644 --- a/src/grimp/application/usecases.py +++ b/src/grimp/application/usecases.py @@ -2,7 +2,11 @@ Use cases handle application logic. """ -from typing import Dict, Sequence, Set, Type, Union, cast, Iterable +from typing import Dict, Sequence, Set, Type, Union, cast, Iterable, Collection +import multiprocessing +import math + +from joblib import Parallel, delayed, parallel_config # type: ignore from ..application.ports import caching @@ -14,6 +18,10 @@ from ..domain.valueobjects import DirectImport, Module from .config import settings +N_CPUS = multiprocessing.cpu_count() +# Chunks smaller than this will likely not give a performance benefit. +IMPORT_SCANNING_MIN_FILES_PER_CHUNK = 8 + class NotSupplied: pass @@ -199,7 +207,7 @@ def _read_imports_from_cache( def _scan_imports( - module_files: Iterable[ModuleFile], + module_files: Collection[ModuleFile], *, file_system: AbstractFileSystem, found_packages: Set[FoundPackage], @@ -211,9 +219,43 @@ def _scan_imports( found_packages=found_packages, include_external_packages=include_external_packages, ) + + imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {} + + # Create [1, N_CPUS] chunks, limited by IMPORT_SCANNING_MIN_FILES_PER_CHUNK. + max_n_chunks = max(math.floor(len(module_files) / IMPORT_SCANNING_MIN_FILES_PER_CHUNK), 1) + n_chunks = min(N_CPUS, max_n_chunks) + + chunks = _create_chunks(list(module_files), n_chunks=n_chunks) + with parallel_config(n_jobs=n_chunks): + import_scanning_jobs = Parallel()( + delayed(_scan_chunk)( + import_scanner=import_scanner, + exclude_type_checking_imports=exclude_type_checking_imports, + chunk=chunk, + ) + for chunk in chunks + ) + for chunk_imports_by_module_file in import_scanning_jobs: + imports_by_module_file.update(chunk_imports_by_module_file) + return imports_by_module_file + + +def _create_chunks( + module_files: Sequence[ModuleFile], *, n_chunks: int +) -> Iterable[Iterable[ModuleFile]]: + chunk_size = math.ceil(len(module_files) / n_chunks) + return [module_files[i * chunk_size : (i + 1) * chunk_size] for i in range(n_chunks)] + + +def _scan_chunk( + import_scanner: AbstractImportScanner, + exclude_type_checking_imports: bool, + chunk: Iterable[ModuleFile], +) -> Dict[ModuleFile, Set[DirectImport]]: return { module_file: import_scanner.scan_for_imports( module_file.module, exclude_type_checking_imports=exclude_type_checking_imports ) - for module_file in module_files + for module_file in chunk } From 678e80015bb6ca65d3507f289dc1ec13e406e73b Mon Sep 17 00:00:00 2001 From: Peter Byfield Date: Fri, 4 Apr 2025 20:50:05 +0200 Subject: [PATCH 6/7] Update changelog --- CHANGELOG.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 3fe3702b..5a9fff84 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,11 @@ Changelog ========= +Unreleased +---------- + +* Accelerate import scanning via CPU parallelism (multiprocessing). + 3.7.1 (2025-03-12) ------------------ From cf2b9c2080bcc56bc205d89dd040d7bed012c526 Mon Sep 17 00:00:00 2001 From: Peter Byfield Date: Sat, 5 Apr 2025 17:01:59 +0200 Subject: [PATCH 7/7] fixup: use multiprocessing.Pool instead of joblib --- pyproject.toml | 1 - src/grimp/application/usecases.py | 20 +++++++------------- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b5b8b888..e04d1a94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,6 @@ authors = [ ] requires-python = ">=3.9" dependencies = [ - "joblib~=1.4", "typing-extensions>=3.10.0.0", ] classifiers = [ diff --git a/src/grimp/application/usecases.py b/src/grimp/application/usecases.py index dde6fd3b..733299aa 100644 --- a/src/grimp/application/usecases.py +++ b/src/grimp/application/usecases.py @@ -6,9 +6,6 @@ import multiprocessing import math -from joblib import Parallel, delayed, parallel_config # type: ignore - - from ..application.ports import caching from ..application.ports.filesystem import AbstractFileSystem from ..application.ports.graph import ImportGraph @@ -227,17 +224,14 @@ def _scan_imports( n_chunks = min(N_CPUS, max_n_chunks) chunks = _create_chunks(list(module_files), n_chunks=n_chunks) - with parallel_config(n_jobs=n_chunks): - import_scanning_jobs = Parallel()( - delayed(_scan_chunk)( - import_scanner=import_scanner, - exclude_type_checking_imports=exclude_type_checking_imports, - chunk=chunk, - ) - for chunk in chunks + with multiprocessing.Pool(n_chunks) as pool: + import_scanning_jobs = pool.starmap( + _scan_chunk, + [(import_scanner, exclude_type_checking_imports, chunk) for chunk in chunks], ) - for chunk_imports_by_module_file in import_scanning_jobs: - imports_by_module_file.update(chunk_imports_by_module_file) + for chunk_imports_by_module_file in import_scanning_jobs: + imports_by_module_file.update(chunk_imports_by_module_file) + return imports_by_module_file