From 64c619e1517ed44893e6b7aa69775c630bfa5e2f Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Tue, 16 Dec 2025 10:06:24 -0500 Subject: [PATCH 1/2] manager shutdown --- scripts/run_validation.py | 145 ++++++++++++++++++++------------------ 1 file changed, 77 insertions(+), 68 deletions(-) diff --git a/scripts/run_validation.py b/scripts/run_validation.py index 8236e6863..1d63402d5 100644 --- a/scripts/run_validation.py +++ b/scripts/run_validation.py @@ -128,77 +128,86 @@ def run_validation(args: Validation_args): CacheManager.register("InMemoryCacheService", InMemoryCacheService) manager = CacheManager() manager.start() - shared_cache = get_cache_service(manager) - engine_logger.info(f"Populating cache, cache path: {args.cache}") - rules = get_rules(args) - library_metadata: LibraryMetadataContainer = get_library_metadata_from_cache(args) - max_dataset_size = get_max_dataset_size(args.dataset_paths) - standard = args.standard - standard_version = args.version.replace(".", "-") - standard_substandard = args.substandard - data_service = DataServiceFactory( - config, - shared_cache, - max_dataset_size=max_dataset_size, - standard=standard, - standard_version=standard_version, - standard_substandard=standard_substandard, - library_metadata=library_metadata, - ).get_data_service(args.dataset_paths) - # install dictionaries if needed - dictionary_versions = fill_cache_with_dictionaries(shared_cache, args, data_service) - large_dataset_validation: bool = ( - data_service.dataset_implementation != PandasDataset - ) - datasets = data_service.get_datasets() - created_files = [] - if large_dataset_validation and data_service.standard != "usdm": - # convert all files to parquet temp files - engine_logger.warning( - "Large datasets must use parquet format, converting all datasets to parquet" + try: + shared_cache = get_cache_service(manager) + engine_logger.info(f"Populating cache, cache path: {args.cache}") + rules = get_rules(args) + library_metadata: LibraryMetadataContainer = get_library_metadata_from_cache( + args ) - for dataset in datasets: - file_path = dataset.full_path - if file_path.endswith(".parquet"): - continue - num_rows, new_file = data_service.to_parquet(file_path) - created_files.append(new_file) - dataset.full_path = new_file - dataset.record_count = num_rows - dataset.original_path = file_path - engine_logger.info(f"Running {len(rules)} rules against {len(datasets)} datasets") - start = time.time() - results = [] - # instantiate logger in each child process to maintain log level - initializer = partial( - initialize_logger, engine_logger.disabled, engine_logger._logger.level - ) - # run each rule in a separate process - with Pool(args.pool_size, initializer=initializer) as pool: - validation_results: Iterable[RuleValidationResult] = pool.imap_unordered( - partial( - validate_single_rule, shared_cache, datasets, args, library_metadata - ), - rules, + max_dataset_size = get_max_dataset_size(args.dataset_paths) + standard = args.standard + standard_version = args.version.replace(".", "-") + standard_substandard = args.substandard + data_service = DataServiceFactory( + config, + shared_cache, + max_dataset_size=max_dataset_size, + standard=standard, + standard_version=standard_version, + standard_substandard=standard_substandard, + library_metadata=library_metadata, + ).get_data_service(args.dataset_paths) + # install dictionaries if needed + dictionary_versions = fill_cache_with_dictionaries( + shared_cache, args, data_service + ) + large_dataset_validation: bool = ( + data_service.dataset_implementation != PandasDataset + ) + datasets = data_service.get_datasets() + created_files = [] + if large_dataset_validation and data_service.standard != "usdm": + # convert all files to parquet temp files + engine_logger.warning( + "Large datasets must use parquet format, converting all datasets to parquet" + ) + for dataset in datasets: + file_path = dataset.full_path + if file_path.endswith(".parquet"): + continue + num_rows, new_file = data_service.to_parquet(file_path) + created_files.append(new_file) + dataset.full_path = new_file + dataset.record_count = num_rows + dataset.original_path = file_path + engine_logger.info( + f"Running {len(rules)} rules against {len(datasets)} datasets" ) - progress_handler: Callable = get_progress_displayer(args) - results = progress_handler(rules, validation_results, results) + start = time.time() + results = [] + # instantiate logger in each child process to maintain log level + initializer = partial( + initialize_logger, engine_logger.disabled, engine_logger._logger.level + ) + # run each rule in a separate process + with Pool(args.pool_size, initializer=initializer) as pool: + validation_results: Iterable[RuleValidationResult] = pool.imap_unordered( + partial( + validate_single_rule, shared_cache, datasets, args, library_metadata + ), + rules, + ) + progress_handler: Callable = get_progress_displayer(args) + results = progress_handler(rules, validation_results, results) - # build all desired reports - end = time.time() - elapsed_time = end - start - engine_logger.info("Done Rule execution, creating reports") - reporting_factory = ReportFactory( - datasets, results, elapsed_time, args, data_service, dictionary_versions - ) - reporting_services: List[BaseReport] = reporting_factory.get_report_services() - for reporting_service in reporting_services: - reporting_service.write_report() - print(f"Output: {args.output}") - engine_logger.info(" Report generated, Cleaning up intermediate files") - for file in created_files: - engine_logger.info(f"Deleting file {file}") - os.remove(file) + # build all desired reports + end = time.time() + elapsed_time = end - start + engine_logger.info("Done Rule execution, creating reports") + reporting_factory = ReportFactory( + datasets, results, elapsed_time, args, data_service, dictionary_versions + ) + reporting_services: List[BaseReport] = reporting_factory.get_report_services() + for reporting_service in reporting_services: + reporting_service.write_report() + print(f"Output: {args.output}") + engine_logger.info(" Report generated, Cleaning up intermediate files") + for file in created_files: + engine_logger.info(f"Deleting file {file}") + os.remove(file) + finally: + manager.shutdown() def run_single_rule_validation( From cd4b81e80a6a26782405317fa6cd3f602a6ab5b6 Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Wed, 17 Dec 2025 09:48:40 -0500 Subject: [PATCH 2/2] moved file deletion to finally --- scripts/run_validation.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/scripts/run_validation.py b/scripts/run_validation.py index 1d63402d5..ba5cddf26 100644 --- a/scripts/run_validation.py +++ b/scripts/run_validation.py @@ -202,11 +202,15 @@ def run_validation(args: Validation_args): for reporting_service in reporting_services: reporting_service.write_report() print(f"Output: {args.output}") - engine_logger.info(" Report generated, Cleaning up intermediate files") - for file in created_files: - engine_logger.info(f"Deleting file {file}") - os.remove(file) finally: + if created_files: + engine_logger.info(" Report generated, Cleaning up intermediate files") + for file in created_files: + try: + engine_logger.info(f"Deleting file {file}") + os.remove(file) + except Exception as e: + engine_logger.warning(f"Failed to delete {file}: {e}") manager.shutdown()