Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cdisc_rules_engine/exceptions/custom_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,15 @@ class SchemaNotFoundError(EngineError):
class InvalidSchemaProvidedError(EngineError):
code = 400
description = "Failed to parse XMLSchema"


class PreprocessingError(EngineError):
description = "Error occurred during dataset preprocessing"


class OperationError(EngineError):
description = "Error occurred during operation execution"


class DatasetBuilderError(EngineError):
description = "Error occurred during dataset building"
56 changes: 55 additions & 1 deletion cdisc_rules_engine/rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
DomainNotFoundError,
InvalidSchemaProvidedError,
SchemaNotFoundError,
PreprocessingError,
OperationError,
DatasetBuilderError,
)
from cdisc_rules_engine.interfaces import (
CacheServiceInterface,
Expand Down Expand Up @@ -302,7 +305,15 @@ def validate_rule(
"""
kwargs = {}
builder = self.get_dataset_builder(rule, datasets, dataset_metadata)
dataset = builder.get_dataset()
try:
dataset = builder.get_dataset()
except Exception as e:
raise DatasetBuilderError(
f"Failed to build dataset for rule validation. "
f"Builder: {builder.__class__.__name__}, "
f"Dataset: {dataset_metadata.name}, "
f"Error: {str(e)}"
)
# Update rule for certain rule types
# SPECIAL CASES FOR RULE TYPES ###############################
# TODO: Handle these special cases better.
Expand Down Expand Up @@ -516,6 +527,49 @@ def handle_validation_exceptions( # noqa
message=exception.args[0],
)
message = "rule execution error"
elif isinstance(exception, PreprocessingError):
error_obj = FailedValidationEntity(
dataset=os.path.basename(dataset_path),
error=PreprocessingError.description,
message=str(exception),
)
message = "rule evaluation error - preprocessing failed"
errors = [error_obj]
return ValidationErrorContainer(
dataset=os.path.basename(dataset_path),
errors=errors,
message=message,
status=ExecutionStatus.SKIPPED.value,
)

elif isinstance(exception, OperationError):
error_obj = FailedValidationEntity(
dataset=os.path.basename(dataset_path),
error=OperationError.description,
message=str(exception),
)
message = "rule evaluation error - operation failed"
errors = [error_obj]
return ValidationErrorContainer(
dataset=os.path.basename(dataset_path),
errors=errors,
message=message,
status=ExecutionStatus.SKIPPED.value,
)
elif isinstance(exception, DatasetBuilderError):
error_obj = FailedValidationEntity(
dataset=os.path.basename(dataset_path),
error=DatasetBuilderError.description,
message=str(exception),
)
message = "rule evaluation error - evaluation dataset failed to build"
errors = [error_obj]
return ValidationErrorContainer(
dataset=os.path.basename(dataset_path),
errors=errors,
message=message,
status=ExecutionStatus.SKIPPED.value,
)
elif isinstance(exception, FailedSchemaValidation):
if self.validate_xml:
error_obj = FailedValidationEntity(
Expand Down
187 changes: 127 additions & 60 deletions cdisc_rules_engine/utilities/dataset_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
get_sided_match_keys,
get_dataset_name_from_details,
)
from cdisc_rules_engine.exceptions.custom_exceptions import PreprocessingError
import os
import pandas as pd

Expand Down Expand Up @@ -107,11 +108,26 @@ def preprocess( # noqa
)
)
]

if not file_infos:
raise PreprocessingError(
f"Failed to find related dataset for '{domain_name}' in preprocessor"
)

for file_info in file_infos:
if file_info.domain in merged_domains:
continue

filename = get_dataset_name_from_details(file_info)
other_dataset: DatasetInterface = self._download_dataset(filename)

# Try to download the dataset
try:
other_dataset: DatasetInterface = self._download_dataset(filename)
except Exception as e:
raise PreprocessingError(
f"Failed to download dataset '{filename}' for preprocessing: {str(e)}"
)

referenced_targets = set(
[
target.replace(f"{domain_name}.", "")
Expand Down Expand Up @@ -146,26 +162,35 @@ def preprocess( # noqa
right_dataset_domain_details=domain_details,
datasets=datasets,
)
merged_domains.add(file_info.domain)
return result

def _find_parent_dataset(
self, datasets: Iterable[SDTMDatasetMetadata], domain_details: dict
) -> SDTMDatasetMetadata:
matching_datasets = []
if "RDOMAIN" in self._dataset.columns:
rdomain_column = self._dataset.data["RDOMAIN"]
unique_domains = set(rdomain_column.unique())
for dataset in datasets:
if dataset.domain in unique_domains:
matching_datasets.append(dataset)
else:
match_keys = domain_details.get("match_key")
for dataset in datasets:
has_all_match_keys = all(
match_key in dataset.first_record for match_key in match_keys
)
if has_all_match_keys:
matching_datasets.append(dataset)
try:
if "RDOMAIN" in self._dataset.columns:
rdomain_column = self._dataset.data["RDOMAIN"]
unique_domains = set(rdomain_column.unique())
for dataset in datasets:
if dataset.domain in unique_domains:
matching_datasets.append(dataset)
else:
match_keys = domain_details.get("match_key")
for dataset in datasets:
has_all_match_keys = all(
match_key in dataset.first_record for match_key in match_keys
)
if has_all_match_keys:
matching_datasets.append(dataset)
except Exception as e:
raise PreprocessingError(
f"Error during parent dataset search. "
f"Current dataset: {self._dataset_metadata.name}, "
f"Match keys: {domain_details.get('match_key')}, "
f"Error: {str(e)}"
)
if not matching_datasets:
logger.warning(
f"Child specified in match but no parent datasets found for: {domain_details}"
Expand Down Expand Up @@ -214,14 +239,22 @@ def _child_merge_datasets(
"--",
right_dataset_domain_name,
)
result = left_dataset.merge(
right_dataset.data,
how="left",
left_on=left_dataset_match_keys,
right_on=right_dataset_match_keys,
suffixes=("", f".{right_dataset_domain_name}"),
)
return result
try:
result = left_dataset.merge(
right_dataset.data,
how="left",
left_on=left_dataset_match_keys,
right_on=right_dataset_match_keys,
suffixes=("", f".{right_dataset_domain_name}"),
)
return result
except Exception as e:
raise PreprocessingError(
f"Merge operation failed during child merge. "
f"Left dataset: {left_dataset_domain_name}, "
f"Right dataset: {right_dataset_domain_name}, "
f"Error: {str(e)}"
)

def _classify_dataset(
self, dataset: DatasetInterface, metadata: SDTMDatasetMetadata
Expand All @@ -239,19 +272,28 @@ def _merge_rdomain_dataset(
right_dataset_domain_name: str,
match_keys: List[str],
) -> DatasetInterface:
relevant_child_records = self._get_relevant_child_records(
left_dataset, right_dataset_domain_name
)
merged_records = self._merge_with_idvar_logic(
relevant_child_records,
right_dataset,
left_dataset_domain_name,
right_dataset_domain_name,
match_keys,
)
return self._update_dataset_with_merged_records(
left_dataset, relevant_child_records, merged_records
)
try:
relevant_child_records = self._get_relevant_child_records(
left_dataset, right_dataset_domain_name
)
merged_records = self._merge_with_idvar_logic(
relevant_child_records,
right_dataset,
left_dataset_domain_name,
right_dataset_domain_name,
match_keys,
)
return self._update_dataset_with_merged_records(
left_dataset, relevant_child_records, merged_records
)
except Exception as e:
raise PreprocessingError(
f"Failed to merge with IDVAR logic. "
f"Left dataset: {left_dataset_domain_name}, "
f"Right dataset: {right_dataset_domain_name}, "
f"Match keys: {match_keys}, "
f"Error: {str(e)}"
)

def _get_relevant_child_records(
self, left_dataset: DatasetInterface, parent_domain: str
Expand Down Expand Up @@ -457,7 +499,7 @@ def _update_dataset_with_merged_records(
updated_data = pd.concat([remaining_records, merged_records], ignore_index=True)
return self._dataset.__class__(data=updated_data)

def _merge_datasets(
def _merge_datasets( # noqa
self,
left_dataset: DatasetInterface,
left_dataset_domain_name: str,
Expand Down Expand Up @@ -487,31 +529,56 @@ def _merge_datasets(

# merge datasets based on their type
if right_dataset_domain_name == "RELREC":
result: DatasetInterface = DataProcessor.merge_relrec_datasets(
left_dataset=left_dataset,
left_dataset_domain_name=left_dataset_domain_name,
relrec_dataset=right_dataset,
datasets=datasets,
dataset_preprocessor=self,
wildcard=right_dataset_domain_details.get("wildcard"),
)
try:
result: DatasetInterface = DataProcessor.merge_relrec_datasets(
left_dataset=left_dataset,
left_dataset_domain_name=left_dataset_domain_name,
relrec_dataset=right_dataset,
datasets=datasets,
dataset_preprocessor=self,
wildcard=right_dataset_domain_details.get("wildcard"),
)
except Exception as e:
raise PreprocessingError(
f"Failed to merge RELREC dataset in preprocessing. "
f"Left dataset: {left_dataset_domain_name}, "
f"RELREC dataset: {right_dataset_domain_name}, "
f"Wildcard: {right_dataset_domain_details.get('wildcard')}, "
f"Match keys: {match_keys}, "
f"Error: {str(e)}"
)
elif right_dataset_domain_name.startswith(
"SUPP"
) or right_dataset_domain_name.startswith("SQ"):
result = DataProcessor.merge_pivot_supp_dataset(
dataset_implementation=self._data_service.dataset_implementation,
left_dataset=left_dataset,
right_dataset=right_dataset,
)
try:
result: DatasetInterface = DataProcessor.merge_pivot_supp_dataset(
dataset_implementation=self._data_service.dataset_implementation,
left_dataset=left_dataset,
right_dataset=right_dataset,
)
except Exception as e:
raise PreprocessingError(
f"Failed to merge supplemental/qualifier dataset. "
f"Left dataset: {left_dataset_domain_name} ({len(left_dataset)} rows), "
f"SUPP/SQ dataset: {right_dataset_domain_name} ({len(right_dataset)} rows), "
f"Error: {str(e)}"
)
else:
result: DatasetInterface = DataProcessor.merge_sdtm_datasets(
left_dataset=left_dataset,
right_dataset=right_dataset,
left_dataset_match_keys=left_dataset_match_keys,
right_dataset_match_keys=right_dataset_match_keys,
right_dataset_domain_name=right_dataset_domain_name,
join_type=JoinTypes(
right_dataset_domain_details.get("join_type", "inner")
),
)
try:
result: DatasetInterface = DataProcessor.merge_sdtm_datasets(
left_dataset=left_dataset,
right_dataset=right_dataset,
left_dataset_match_keys=left_dataset_match_keys,
right_dataset_match_keys=right_dataset_match_keys,
right_dataset_domain_name=right_dataset_domain_name,
join_type=JoinTypes(
right_dataset_domain_details.get("join_type", "inner")
),
)
except Exception as e:
raise PreprocessingError(
f"Failed to merge datasets. "
f"Left dataset: {left_dataset_domain_name}, Right dataset: {right_dataset_domain_name}, "
f"Error: {str(e)}"
)
return result
24 changes: 16 additions & 8 deletions cdisc_rules_engine/utilities/rule_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import re
import copy
import os

from typing import Iterable, List, Optional, Set, Union, Tuple
from cdisc_rules_engine.enums.rule_types import RuleTypes
from cdisc_rules_engine.interfaces.cache_service_interface import (
Expand All @@ -11,9 +14,6 @@
from cdisc_rules_engine.models.library_metadata_container import (
LibraryMetadataContainer,
)

import copy
import os
from cdisc_rules_engine.constants.classes import (
FINDINGS_ABOUT,
FINDINGS,
Expand All @@ -28,6 +28,7 @@
from cdisc_rules_engine.interfaces import ConditionInterface
from cdisc_rules_engine.models.operation_params import OperationParams
from cdisc_rules_engine.models.rule_conditions import AllowedConditionsKeys
from cdisc_rules_engine.exceptions.custom_exceptions import OperationError
from cdisc_rules_engine.operations import operations_factory
from cdisc_rules_engine.services import logger
from cdisc_rules_engine.utilities.data_processor import DataProcessor
Expand Down Expand Up @@ -424,11 +425,18 @@ def perform_rule_operations(
value_is_reference=operation.get("value_is_reference", False),
delimiter=operation.get("delimiter"),
)

# execute operation
dataset_copy = self._execute_operation(
operation_params, dataset_copy, previous_operations
)
try:
# execute operation
dataset_copy = self._execute_operation(
operation_params, dataset_copy, previous_operations
)
except Exception as e:
raise OperationError(
f"Failed to execute rule operation. "
f"Operation: {operation_params.operation_name}, "
f"Target: {target}, Domain: {domain}, "
f"Error: {str(e)}"
)
previous_operations.append(operation_params.operation_name)

logger.info(
Expand Down
Loading
Loading