Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 81 additions & 68 deletions scripts/run_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,77 +128,90 @@ def run_validation(args: Validation_args):
CacheManager.register("InMemoryCacheService", InMemoryCacheService)
manager = CacheManager()
manager.start()
shared_cache = get_cache_service(manager)
engine_logger.info(f"Populating cache, cache path: {args.cache}")
rules = get_rules(args)
library_metadata: LibraryMetadataContainer = get_library_metadata_from_cache(args)
max_dataset_size = get_max_dataset_size(args.dataset_paths)
standard = args.standard
standard_version = args.version.replace(".", "-")
standard_substandard = args.substandard
data_service = DataServiceFactory(
config,
shared_cache,
max_dataset_size=max_dataset_size,
standard=standard,
standard_version=standard_version,
standard_substandard=standard_substandard,
library_metadata=library_metadata,
).get_data_service(args.dataset_paths)
# install dictionaries if needed
dictionary_versions = fill_cache_with_dictionaries(shared_cache, args, data_service)
large_dataset_validation: bool = (
data_service.dataset_implementation != PandasDataset
)
datasets = data_service.get_datasets()
created_files = []
if large_dataset_validation and data_service.standard != "usdm":
# convert all files to parquet temp files
engine_logger.warning(
"Large datasets must use parquet format, converting all datasets to parquet"
try:
shared_cache = get_cache_service(manager)
engine_logger.info(f"Populating cache, cache path: {args.cache}")
rules = get_rules(args)
library_metadata: LibraryMetadataContainer = get_library_metadata_from_cache(
args
)
for dataset in datasets:
file_path = dataset.full_path
if file_path.endswith(".parquet"):
continue
num_rows, new_file = data_service.to_parquet(file_path)
created_files.append(new_file)
dataset.full_path = new_file
dataset.record_count = num_rows
dataset.original_path = file_path
engine_logger.info(f"Running {len(rules)} rules against {len(datasets)} datasets")
start = time.time()
results = []
# instantiate logger in each child process to maintain log level
initializer = partial(
initialize_logger, engine_logger.disabled, engine_logger._logger.level
)
# run each rule in a separate process
with Pool(args.pool_size, initializer=initializer) as pool:
validation_results: Iterable[RuleValidationResult] = pool.imap_unordered(
partial(
validate_single_rule, shared_cache, datasets, args, library_metadata
),
rules,
max_dataset_size = get_max_dataset_size(args.dataset_paths)
standard = args.standard
standard_version = args.version.replace(".", "-")
standard_substandard = args.substandard
data_service = DataServiceFactory(
config,
shared_cache,
max_dataset_size=max_dataset_size,
standard=standard,
standard_version=standard_version,
standard_substandard=standard_substandard,
library_metadata=library_metadata,
).get_data_service(args.dataset_paths)
# install dictionaries if needed
dictionary_versions = fill_cache_with_dictionaries(
shared_cache, args, data_service
)
large_dataset_validation: bool = (
data_service.dataset_implementation != PandasDataset
)
datasets = data_service.get_datasets()
created_files = []
if large_dataset_validation and data_service.standard != "usdm":
# convert all files to parquet temp files
engine_logger.warning(
"Large datasets must use parquet format, converting all datasets to parquet"
)
for dataset in datasets:
file_path = dataset.full_path
if file_path.endswith(".parquet"):
continue
num_rows, new_file = data_service.to_parquet(file_path)
created_files.append(new_file)
dataset.full_path = new_file
dataset.record_count = num_rows
dataset.original_path = file_path
engine_logger.info(
f"Running {len(rules)} rules against {len(datasets)} datasets"
)
progress_handler: Callable = get_progress_displayer(args)
results = progress_handler(rules, validation_results, results)
start = time.time()
results = []
# instantiate logger in each child process to maintain log level
initializer = partial(
initialize_logger, engine_logger.disabled, engine_logger._logger.level
)
# run each rule in a separate process
with Pool(args.pool_size, initializer=initializer) as pool:
validation_results: Iterable[RuleValidationResult] = pool.imap_unordered(
partial(
validate_single_rule, shared_cache, datasets, args, library_metadata
),
rules,
)
progress_handler: Callable = get_progress_displayer(args)
results = progress_handler(rules, validation_results, results)

# build all desired reports
end = time.time()
elapsed_time = end - start
engine_logger.info("Done Rule execution, creating reports")
reporting_factory = ReportFactory(
datasets, results, elapsed_time, args, data_service, dictionary_versions
)
reporting_services: List[BaseReport] = reporting_factory.get_report_services()
for reporting_service in reporting_services:
reporting_service.write_report()
print(f"Output: {args.output}")
engine_logger.info(" Report generated, Cleaning up intermediate files")
for file in created_files:
engine_logger.info(f"Deleting file {file}")
os.remove(file)
# build all desired reports
end = time.time()
elapsed_time = end - start
engine_logger.info("Done Rule execution, creating reports")
reporting_factory = ReportFactory(
datasets, results, elapsed_time, args, data_service, dictionary_versions
)
reporting_services: List[BaseReport] = reporting_factory.get_report_services()
for reporting_service in reporting_services:
reporting_service.write_report()
print(f"Output: {args.output}")
finally:
if created_files:
engine_logger.info(" Report generated, Cleaning up intermediate files")
for file in created_files:
try:
engine_logger.info(f"Deleting file {file}")
os.remove(file)
except Exception as e:
engine_logger.warning(f"Failed to delete {file}: {e}")
manager.shutdown()


def run_single_rule_validation(
Expand Down
Loading