diff --git a/cdisc_rules_engine/exceptions/custom_exceptions.py b/cdisc_rules_engine/exceptions/custom_exceptions.py index 45c6ea06c..94da31a67 100644 --- a/cdisc_rules_engine/exceptions/custom_exceptions.py +++ b/cdisc_rules_engine/exceptions/custom_exceptions.py @@ -88,3 +88,15 @@ class SchemaNotFoundError(EngineError): class InvalidSchemaProvidedError(EngineError): code = 400 description = "Failed to parse XMLSchema" + + +class PreprocessingError(EngineError): + description = "Error occurred during dataset preprocessing" + + +class OperationError(EngineError): + description = "Error occurred during operation execution" + + +class DatasetBuilderError(EngineError): + description = "Error occurred during dataset building" diff --git a/cdisc_rules_engine/rules_engine.py b/cdisc_rules_engine/rules_engine.py index 795f8d185..90253697b 100644 --- a/cdisc_rules_engine/rules_engine.py +++ b/cdisc_rules_engine/rules_engine.py @@ -17,6 +17,9 @@ DomainNotFoundError, InvalidSchemaProvidedError, SchemaNotFoundError, + PreprocessingError, + OperationError, + DatasetBuilderError, ) from cdisc_rules_engine.interfaces import ( CacheServiceInterface, @@ -302,7 +305,15 @@ def validate_rule( """ kwargs = {} builder = self.get_dataset_builder(rule, datasets, dataset_metadata) - dataset = builder.get_dataset() + try: + dataset = builder.get_dataset() + except Exception as e: + raise DatasetBuilderError( + f"Failed to build dataset for rule validation. " + f"Builder: {builder.__class__.__name__}, " + f"Dataset: {dataset_metadata.name}, " + f"Error: {str(e)}" + ) # Update rule for certain rule types # SPECIAL CASES FOR RULE TYPES ############################### # TODO: Handle these special cases better. @@ -516,6 +527,49 @@ def handle_validation_exceptions( # noqa message=exception.args[0], ) message = "rule execution error" + elif isinstance(exception, PreprocessingError): + error_obj = FailedValidationEntity( + dataset=os.path.basename(dataset_path), + error=PreprocessingError.description, + message=str(exception), + ) + message = "rule evaluation error - preprocessing failed" + errors = [error_obj] + return ValidationErrorContainer( + dataset=os.path.basename(dataset_path), + errors=errors, + message=message, + status=ExecutionStatus.SKIPPED.value, + ) + + elif isinstance(exception, OperationError): + error_obj = FailedValidationEntity( + dataset=os.path.basename(dataset_path), + error=OperationError.description, + message=str(exception), + ) + message = "rule evaluation error - operation failed" + errors = [error_obj] + return ValidationErrorContainer( + dataset=os.path.basename(dataset_path), + errors=errors, + message=message, + status=ExecutionStatus.SKIPPED.value, + ) + elif isinstance(exception, DatasetBuilderError): + error_obj = FailedValidationEntity( + dataset=os.path.basename(dataset_path), + error=DatasetBuilderError.description, + message=str(exception), + ) + message = "rule evaluation error - evaluation dataset failed to build" + errors = [error_obj] + return ValidationErrorContainer( + dataset=os.path.basename(dataset_path), + errors=errors, + message=message, + status=ExecutionStatus.SKIPPED.value, + ) elif isinstance(exception, FailedSchemaValidation): if self.validate_xml: error_obj = FailedValidationEntity( diff --git a/cdisc_rules_engine/utilities/dataset_preprocessor.py b/cdisc_rules_engine/utilities/dataset_preprocessor.py index a86437d69..f1ebc647c 100644 --- a/cdisc_rules_engine/utilities/dataset_preprocessor.py +++ b/cdisc_rules_engine/utilities/dataset_preprocessor.py @@ -16,6 +16,7 @@ get_sided_match_keys, get_dataset_name_from_details, ) +from cdisc_rules_engine.exceptions.custom_exceptions import PreprocessingError import os import pandas as pd @@ -107,11 +108,26 @@ def preprocess( # noqa ) ) ] + + if not file_infos: + raise PreprocessingError( + f"Failed to find related dataset for '{domain_name}' in preprocessor" + ) + for file_info in file_infos: if file_info.domain in merged_domains: continue + filename = get_dataset_name_from_details(file_info) - other_dataset: DatasetInterface = self._download_dataset(filename) + + # Try to download the dataset + try: + other_dataset: DatasetInterface = self._download_dataset(filename) + except Exception as e: + raise PreprocessingError( + f"Failed to download dataset '{filename}' for preprocessing: {str(e)}" + ) + referenced_targets = set( [ target.replace(f"{domain_name}.", "") @@ -146,26 +162,35 @@ def preprocess( # noqa right_dataset_domain_details=domain_details, datasets=datasets, ) + merged_domains.add(file_info.domain) return result def _find_parent_dataset( self, datasets: Iterable[SDTMDatasetMetadata], domain_details: dict ) -> SDTMDatasetMetadata: matching_datasets = [] - if "RDOMAIN" in self._dataset.columns: - rdomain_column = self._dataset.data["RDOMAIN"] - unique_domains = set(rdomain_column.unique()) - for dataset in datasets: - if dataset.domain in unique_domains: - matching_datasets.append(dataset) - else: - match_keys = domain_details.get("match_key") - for dataset in datasets: - has_all_match_keys = all( - match_key in dataset.first_record for match_key in match_keys - ) - if has_all_match_keys: - matching_datasets.append(dataset) + try: + if "RDOMAIN" in self._dataset.columns: + rdomain_column = self._dataset.data["RDOMAIN"] + unique_domains = set(rdomain_column.unique()) + for dataset in datasets: + if dataset.domain in unique_domains: + matching_datasets.append(dataset) + else: + match_keys = domain_details.get("match_key") + for dataset in datasets: + has_all_match_keys = all( + match_key in dataset.first_record for match_key in match_keys + ) + if has_all_match_keys: + matching_datasets.append(dataset) + except Exception as e: + raise PreprocessingError( + f"Error during parent dataset search. " + f"Current dataset: {self._dataset_metadata.name}, " + f"Match keys: {domain_details.get('match_key')}, " + f"Error: {str(e)}" + ) if not matching_datasets: logger.warning( f"Child specified in match but no parent datasets found for: {domain_details}" @@ -214,14 +239,22 @@ def _child_merge_datasets( "--", right_dataset_domain_name, ) - result = left_dataset.merge( - right_dataset.data, - how="left", - left_on=left_dataset_match_keys, - right_on=right_dataset_match_keys, - suffixes=("", f".{right_dataset_domain_name}"), - ) - return result + try: + result = left_dataset.merge( + right_dataset.data, + how="left", + left_on=left_dataset_match_keys, + right_on=right_dataset_match_keys, + suffixes=("", f".{right_dataset_domain_name}"), + ) + return result + except Exception as e: + raise PreprocessingError( + f"Merge operation failed during child merge. " + f"Left dataset: {left_dataset_domain_name}, " + f"Right dataset: {right_dataset_domain_name}, " + f"Error: {str(e)}" + ) def _classify_dataset( self, dataset: DatasetInterface, metadata: SDTMDatasetMetadata @@ -239,19 +272,28 @@ def _merge_rdomain_dataset( right_dataset_domain_name: str, match_keys: List[str], ) -> DatasetInterface: - relevant_child_records = self._get_relevant_child_records( - left_dataset, right_dataset_domain_name - ) - merged_records = self._merge_with_idvar_logic( - relevant_child_records, - right_dataset, - left_dataset_domain_name, - right_dataset_domain_name, - match_keys, - ) - return self._update_dataset_with_merged_records( - left_dataset, relevant_child_records, merged_records - ) + try: + relevant_child_records = self._get_relevant_child_records( + left_dataset, right_dataset_domain_name + ) + merged_records = self._merge_with_idvar_logic( + relevant_child_records, + right_dataset, + left_dataset_domain_name, + right_dataset_domain_name, + match_keys, + ) + return self._update_dataset_with_merged_records( + left_dataset, relevant_child_records, merged_records + ) + except Exception as e: + raise PreprocessingError( + f"Failed to merge with IDVAR logic. " + f"Left dataset: {left_dataset_domain_name}, " + f"Right dataset: {right_dataset_domain_name}, " + f"Match keys: {match_keys}, " + f"Error: {str(e)}" + ) def _get_relevant_child_records( self, left_dataset: DatasetInterface, parent_domain: str @@ -457,7 +499,7 @@ def _update_dataset_with_merged_records( updated_data = pd.concat([remaining_records, merged_records], ignore_index=True) return self._dataset.__class__(data=updated_data) - def _merge_datasets( + def _merge_datasets( # noqa self, left_dataset: DatasetInterface, left_dataset_domain_name: str, @@ -487,31 +529,56 @@ def _merge_datasets( # merge datasets based on their type if right_dataset_domain_name == "RELREC": - result: DatasetInterface = DataProcessor.merge_relrec_datasets( - left_dataset=left_dataset, - left_dataset_domain_name=left_dataset_domain_name, - relrec_dataset=right_dataset, - datasets=datasets, - dataset_preprocessor=self, - wildcard=right_dataset_domain_details.get("wildcard"), - ) + try: + result: DatasetInterface = DataProcessor.merge_relrec_datasets( + left_dataset=left_dataset, + left_dataset_domain_name=left_dataset_domain_name, + relrec_dataset=right_dataset, + datasets=datasets, + dataset_preprocessor=self, + wildcard=right_dataset_domain_details.get("wildcard"), + ) + except Exception as e: + raise PreprocessingError( + f"Failed to merge RELREC dataset in preprocessing. " + f"Left dataset: {left_dataset_domain_name}, " + f"RELREC dataset: {right_dataset_domain_name}, " + f"Wildcard: {right_dataset_domain_details.get('wildcard')}, " + f"Match keys: {match_keys}, " + f"Error: {str(e)}" + ) elif right_dataset_domain_name.startswith( "SUPP" ) or right_dataset_domain_name.startswith("SQ"): - result = DataProcessor.merge_pivot_supp_dataset( - dataset_implementation=self._data_service.dataset_implementation, - left_dataset=left_dataset, - right_dataset=right_dataset, - ) + try: + result: DatasetInterface = DataProcessor.merge_pivot_supp_dataset( + dataset_implementation=self._data_service.dataset_implementation, + left_dataset=left_dataset, + right_dataset=right_dataset, + ) + except Exception as e: + raise PreprocessingError( + f"Failed to merge supplemental/qualifier dataset. " + f"Left dataset: {left_dataset_domain_name} ({len(left_dataset)} rows), " + f"SUPP/SQ dataset: {right_dataset_domain_name} ({len(right_dataset)} rows), " + f"Error: {str(e)}" + ) else: - result: DatasetInterface = DataProcessor.merge_sdtm_datasets( - left_dataset=left_dataset, - right_dataset=right_dataset, - left_dataset_match_keys=left_dataset_match_keys, - right_dataset_match_keys=right_dataset_match_keys, - right_dataset_domain_name=right_dataset_domain_name, - join_type=JoinTypes( - right_dataset_domain_details.get("join_type", "inner") - ), - ) + try: + result: DatasetInterface = DataProcessor.merge_sdtm_datasets( + left_dataset=left_dataset, + right_dataset=right_dataset, + left_dataset_match_keys=left_dataset_match_keys, + right_dataset_match_keys=right_dataset_match_keys, + right_dataset_domain_name=right_dataset_domain_name, + join_type=JoinTypes( + right_dataset_domain_details.get("join_type", "inner") + ), + ) + except Exception as e: + raise PreprocessingError( + f"Failed to merge datasets. " + f"Left dataset: {left_dataset_domain_name}, Right dataset: {right_dataset_domain_name}, " + f"Error: {str(e)}" + ) return result diff --git a/cdisc_rules_engine/utilities/rule_processor.py b/cdisc_rules_engine/utilities/rule_processor.py index cc21d7ad8..c78236484 100644 --- a/cdisc_rules_engine/utilities/rule_processor.py +++ b/cdisc_rules_engine/utilities/rule_processor.py @@ -1,4 +1,7 @@ import re +import copy +import os + from typing import Iterable, List, Optional, Set, Union, Tuple from cdisc_rules_engine.enums.rule_types import RuleTypes from cdisc_rules_engine.interfaces.cache_service_interface import ( @@ -11,9 +14,6 @@ from cdisc_rules_engine.models.library_metadata_container import ( LibraryMetadataContainer, ) - -import copy -import os from cdisc_rules_engine.constants.classes import ( FINDINGS_ABOUT, FINDINGS, @@ -28,6 +28,7 @@ from cdisc_rules_engine.interfaces import ConditionInterface from cdisc_rules_engine.models.operation_params import OperationParams from cdisc_rules_engine.models.rule_conditions import AllowedConditionsKeys +from cdisc_rules_engine.exceptions.custom_exceptions import OperationError from cdisc_rules_engine.operations import operations_factory from cdisc_rules_engine.services import logger from cdisc_rules_engine.utilities.data_processor import DataProcessor @@ -424,11 +425,18 @@ def perform_rule_operations( value_is_reference=operation.get("value_is_reference", False), delimiter=operation.get("delimiter"), ) - - # execute operation - dataset_copy = self._execute_operation( - operation_params, dataset_copy, previous_operations - ) + try: + # execute operation + dataset_copy = self._execute_operation( + operation_params, dataset_copy, previous_operations + ) + except Exception as e: + raise OperationError( + f"Failed to execute rule operation. " + f"Operation: {operation_params.operation_name}, " + f"Target: {target}, Domain: {domain}, " + f"Error: {str(e)}" + ) previous_operations.append(operation_params.operation_name) logger.info( diff --git a/tests/unit/test_rules_engine.py b/tests/unit/test_rules_engine.py index 7e8bf872e..8573651cb 100644 --- a/tests/unit/test_rules_engine.py +++ b/tests/unit/test_rules_engine.py @@ -1292,16 +1292,18 @@ def test_validate_single_dataset_not_equal_to( ), [ { - "executionStatus": "execution_error", + "executionStatus": "skipped", "dataset": "ae.xpt", "domain": "AE", "variables": [], - "message": "rule execution error", + "message": "rule evaluation error - evaluation dataset failed to build", "errors": [ { "dataset": "ae.xpt", - "error": "An unknown exception has occurred", - "message": "single positional indexer is out-of-bounds", + "error": "Error occurred during dataset building", + "message": "Failed to build dataset for rule validation. Builder: " + "DatasetMetadataDefineDatasetBuilder, Dataset: AE, " + "Error: single positional indexer is out-of-bounds", } ], }