diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 3fe3702b..5a9fff84 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,11 @@ Changelog ========= +Unreleased +---------- + +* Accelerate import scanning via CPU parallelism (multiprocessing). + 3.7.1 (2025-03-12) ------------------ diff --git a/src/grimp/application/usecases.py b/src/grimp/application/usecases.py index 3fc8e4a7..733299aa 100644 --- a/src/grimp/application/usecases.py +++ b/src/grimp/application/usecases.py @@ -1,17 +1,24 @@ """ Use cases handle application logic. """ -from typing import Dict, Sequence, Set, Type, Union, cast + +from typing import Dict, Sequence, Set, Type, Union, cast, Iterable, Collection +import multiprocessing +import math from ..application.ports import caching from ..application.ports.filesystem import AbstractFileSystem from ..application.ports.graph import ImportGraph from ..application.ports.importscanner import AbstractImportScanner -from ..application.ports.modulefinder import AbstractModuleFinder, FoundPackage +from ..application.ports.modulefinder import AbstractModuleFinder, FoundPackage, ModuleFile from ..application.ports.packagefinder import AbstractPackageFinder from ..domain.valueobjects import DirectImport, Module from .config import settings +N_CPUS = multiprocessing.cpu_count() +# Chunks smaller than this will likely not give a performance benefit. +IMPORT_SCANNING_MIN_FILES_PER_CHUNK = 8 + class NotSupplied: pass @@ -106,7 +113,6 @@ def _scan_packages( exclude_type_checking_imports: bool, cache_dir: Union[str, Type[NotSupplied], None], ) -> Dict[Module, Set[DirectImport]]: - imports_by_module: Dict[Module, Set[DirectImport]] = {} if cache_dir is not None: cache_dir_if_supplied = cache_dir if cache_dir != NotSupplied else None cache: caching.Cache = settings.CACHE_CLASS.setup( @@ -116,24 +122,33 @@ def _scan_packages( exclude_type_checking_imports=exclude_type_checking_imports, cache_dir=cache_dir_if_supplied, ) - import_scanner: AbstractImportScanner = settings.IMPORT_SCANNER_CLASS( - file_system=file_system, - found_packages=found_packages, - include_external_packages=include_external_packages, - ) - for found_package in found_packages: - for module_file in found_package.module_files: - module = module_file.module - try: - if cache_dir is None: - raise caching.CacheMiss - direct_imports = cache.read_imports(module_file) - except caching.CacheMiss: - direct_imports = import_scanner.scan_for_imports( - module, exclude_type_checking_imports=exclude_type_checking_imports - ) - imports_by_module[module] = direct_imports + module_files_to_scan = { + module_file + for found_package in found_packages + for module_file in found_package.module_files + } + + imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {} + + if cache_dir is not None: + imports_by_module_file.update(_read_imports_from_cache(module_files_to_scan, cache=cache)) + + remaining_module_files_to_scan = module_files_to_scan.difference(imports_by_module_file) + if remaining_module_files_to_scan: + imports_by_module_file.update( + _scan_imports( + remaining_module_files_to_scan, + file_system=file_system, + found_packages=found_packages, + include_external_packages=include_external_packages, + exclude_type_checking_imports=exclude_type_checking_imports, + ) + ) + + imports_by_module: Dict[Module, Set[DirectImport]] = { + k.module: v for k, v in imports_by_module_file.items() + } if cache_dir is not None: cache.write(imports_by_module) @@ -172,3 +187,69 @@ def _is_external(module: Module, found_packages: Set[FoundPackage]) -> bool: module.is_descendant_of(package_module) or module == package_module for package_module in package_modules ) + + +def _read_imports_from_cache( + module_files: Iterable[ModuleFile], *, cache: caching.Cache +) -> Dict[ModuleFile, Set[DirectImport]]: + imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {} + for module_file in module_files: + try: + direct_imports = cache.read_imports(module_file) + except caching.CacheMiss: + continue + else: + imports_by_module_file[module_file] = direct_imports + return imports_by_module_file + + +def _scan_imports( + module_files: Collection[ModuleFile], + *, + file_system: AbstractFileSystem, + found_packages: Set[FoundPackage], + include_external_packages: bool, + exclude_type_checking_imports: bool, +) -> Dict[ModuleFile, Set[DirectImport]]: + import_scanner: AbstractImportScanner = settings.IMPORT_SCANNER_CLASS( + file_system=file_system, + found_packages=found_packages, + include_external_packages=include_external_packages, + ) + + imports_by_module_file: Dict[ModuleFile, Set[DirectImport]] = {} + + # Create [1, N_CPUS] chunks, limited by IMPORT_SCANNING_MIN_FILES_PER_CHUNK. + max_n_chunks = max(math.floor(len(module_files) / IMPORT_SCANNING_MIN_FILES_PER_CHUNK), 1) + n_chunks = min(N_CPUS, max_n_chunks) + + chunks = _create_chunks(list(module_files), n_chunks=n_chunks) + with multiprocessing.Pool(n_chunks) as pool: + import_scanning_jobs = pool.starmap( + _scan_chunk, + [(import_scanner, exclude_type_checking_imports, chunk) for chunk in chunks], + ) + for chunk_imports_by_module_file in import_scanning_jobs: + imports_by_module_file.update(chunk_imports_by_module_file) + + return imports_by_module_file + + +def _create_chunks( + module_files: Sequence[ModuleFile], *, n_chunks: int +) -> Iterable[Iterable[ModuleFile]]: + chunk_size = math.ceil(len(module_files) / n_chunks) + return [module_files[i * chunk_size : (i + 1) * chunk_size] for i in range(n_chunks)] + + +def _scan_chunk( + import_scanner: AbstractImportScanner, + exclude_type_checking_imports: bool, + chunk: Iterable[ModuleFile], +) -> Dict[ModuleFile, Set[DirectImport]]: + return { + module_file: import_scanner.scan_for_imports( + module_file.module, exclude_type_checking_imports=exclude_type_checking_imports + ) + for module_file in chunk + } diff --git a/src/grimp/exceptions.py b/src/grimp/exceptions.py index 7b6530ae..a6baa169 100644 --- a/src/grimp/exceptions.py +++ b/src/grimp/exceptions.py @@ -62,6 +62,11 @@ def __eq__(self, other): other.text, ) + def __reduce__(self): + # Implement __reduce__ to make this exception pickleable, + # allowing it to be sent between processes. + return SourceSyntaxError, (self.filename, self.lineno, self.text) + class InvalidModuleExpression(GrimpException): pass