From 4d8b17aed2c5fb4ab9ad47b6402373e363219a33 Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Wed, 19 Nov 2025 15:59:19 -0500 Subject: [PATCH 1/5] current --- .../exceptions/custom_exceptions.py | 8 +++++ cdisc_rules_engine/rules_engine.py | 31 ++++++++++++++++++ .../utilities/dataset_preprocessor.py | 32 ++++++++++++++++--- 3 files changed, 66 insertions(+), 5 deletions(-) diff --git a/cdisc_rules_engine/exceptions/custom_exceptions.py b/cdisc_rules_engine/exceptions/custom_exceptions.py index 45c6ea06c..ffe04527d 100644 --- a/cdisc_rules_engine/exceptions/custom_exceptions.py +++ b/cdisc_rules_engine/exceptions/custom_exceptions.py @@ -88,3 +88,11 @@ class SchemaNotFoundError(EngineError): class InvalidSchemaProvidedError(EngineError): code = 400 description = "Failed to parse XMLSchema" + + +class PreprocessingError(EngineError): + description = "Error occurred during dataset preprocessing" + + +class OperationError(EngineError): + description = "Error occurred during operation execution" diff --git a/cdisc_rules_engine/rules_engine.py b/cdisc_rules_engine/rules_engine.py index 795f8d185..602892c0e 100644 --- a/cdisc_rules_engine/rules_engine.py +++ b/cdisc_rules_engine/rules_engine.py @@ -17,6 +17,8 @@ DomainNotFoundError, InvalidSchemaProvidedError, SchemaNotFoundError, + PreprocessingError, + OperationError, ) from cdisc_rules_engine.interfaces import ( CacheServiceInterface, @@ -516,6 +518,35 @@ def handle_validation_exceptions( # noqa message=exception.args[0], ) message = "rule execution error" + elif isinstance(exception, PreprocessingError): + error_obj = FailedValidationEntity( + dataset=os.path.basename(dataset_path), + error="Preprocessing failed", + message=str(exception), # All structured info in the message + ) + message = "rule evaluation skipped - preprocessing failed" + errors = [error_obj] + return ValidationErrorContainer( + dataset=os.path.basename(dataset_path), + errors=errors, + message=message, + status=ExecutionStatus.SKIPPED.value, + ) + + elif isinstance(exception, OperationError): + error_obj = FailedValidationEntity( + dataset=os.path.basename(dataset_path), + error="Operation execution failed", + message=str(exception), # All structured info in the message + ) + message = "rule evaluation skipped - operation failed" + errors = [error_obj] + return ValidationErrorContainer( + dataset=os.path.basename(dataset_path), + errors=errors, + message=message, + status=ExecutionStatus.SKIPPED.value, + ) elif isinstance(exception, FailedSchemaValidation): if self.validate_xml: error_obj = FailedValidationEntity( diff --git a/cdisc_rules_engine/utilities/dataset_preprocessor.py b/cdisc_rules_engine/utilities/dataset_preprocessor.py index aca3dead9..2dc3a93ab 100644 --- a/cdisc_rules_engine/utilities/dataset_preprocessor.py +++ b/cdisc_rules_engine/utilities/dataset_preprocessor.py @@ -20,6 +20,16 @@ import pandas as pd +class PreprocessingError(Exception): + """ + Custom exception for preprocessing failures that should trigger rule skipping. + This exception is caught by rules_engine.handle_validation_exceptions() and + returns a SKIPPED status with the exception message shown to the user. + """ + + pass + + class DatasetPreprocessor: """ The class is responsible for preprocessing the dataset @@ -49,10 +59,6 @@ def __init__( def preprocess( # noqa self, rule: dict, datasets: Iterable[SDTMDatasetMetadata] ) -> DatasetInterface: - """ - Preprocesses the dataset by merging it with the - datasets from the provided rule. - """ rule_datasets: List[dict] = rule.get("datasets") if not rule_datasets: return self._dataset # nothing to preprocess @@ -107,11 +113,27 @@ def preprocess( # noqa ) ) ] + + if not file_infos: + raise PreprocessingError( + f"Required dataset '{domain_name}' not found for merging with " + f"{self._dataset_metadata.domain}. Rule is not applicable to this study." + ) + for file_info in file_infos: if file_info.domain in merged_domains: continue + filename = get_dataset_name_from_details(file_info) - other_dataset: DatasetInterface = self._download_dataset(filename) + + # Try to download the dataset + try: + other_dataset: DatasetInterface = self._download_dataset(filename) + except Exception as e: + raise PreprocessingError( + f"Failed to load required dataset '{filename}' for merging: {str(e)}" + ) + referenced_targets = set( [ target.replace(f"{domain_name}.", "") From 787d83fd16d148e6ba35a4be63c0af098c5bcde6 Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Thu, 20 Nov 2025 10:43:35 -0500 Subject: [PATCH 2/5] work --- .../utilities/dataset_preprocessor.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/cdisc_rules_engine/utilities/dataset_preprocessor.py b/cdisc_rules_engine/utilities/dataset_preprocessor.py index 2dc3a93ab..36a614014 100644 --- a/cdisc_rules_engine/utilities/dataset_preprocessor.py +++ b/cdisc_rules_engine/utilities/dataset_preprocessor.py @@ -16,20 +16,11 @@ get_sided_match_keys, get_dataset_name_from_details, ) +from cdisc_rules_engine.exceptions.custom_exceptions import PreprocessingError import os import pandas as pd -class PreprocessingError(Exception): - """ - Custom exception for preprocessing failures that should trigger rule skipping. - This exception is caught by rules_engine.handle_validation_exceptions() and - returns a SKIPPED status with the exception message shown to the user. - """ - - pass - - class DatasetPreprocessor: """ The class is responsible for preprocessing the dataset @@ -116,8 +107,7 @@ def preprocess( # noqa if not file_infos: raise PreprocessingError( - f"Required dataset '{domain_name}' not found for merging with " - f"{self._dataset_metadata.domain}. Rule is not applicable to this study." + f"Required dataset '{domain_name}' not found for {self._dataset_metadata.name}" ) for file_info in file_infos: @@ -131,7 +121,7 @@ def preprocess( # noqa other_dataset: DatasetInterface = self._download_dataset(filename) except Exception as e: raise PreprocessingError( - f"Failed to load required dataset '{filename}' for merging: {str(e)}" + f"Failed to load dataset '{filename}': {str(e)}" ) referenced_targets = set( From a5ba60f771d221637da7b00641e176c98a6a64cb Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Thu, 20 Nov 2025 14:58:05 -0500 Subject: [PATCH 3/5] errors --- .../exceptions/custom_exceptions.py | 4 + cdisc_rules_engine/rules_engine.py | 36 +++- .../utilities/dataset_preprocessor.py | 195 ++++++++++++------ .../utilities/rule_processor.py | 24 ++- 4 files changed, 176 insertions(+), 83 deletions(-) diff --git a/cdisc_rules_engine/exceptions/custom_exceptions.py b/cdisc_rules_engine/exceptions/custom_exceptions.py index ffe04527d..94da31a67 100644 --- a/cdisc_rules_engine/exceptions/custom_exceptions.py +++ b/cdisc_rules_engine/exceptions/custom_exceptions.py @@ -96,3 +96,7 @@ class PreprocessingError(EngineError): class OperationError(EngineError): description = "Error occurred during operation execution" + + +class DatasetBuilderError(EngineError): + description = "Error occurred during dataset building" diff --git a/cdisc_rules_engine/rules_engine.py b/cdisc_rules_engine/rules_engine.py index 602892c0e..4e8c992da 100644 --- a/cdisc_rules_engine/rules_engine.py +++ b/cdisc_rules_engine/rules_engine.py @@ -19,6 +19,7 @@ SchemaNotFoundError, PreprocessingError, OperationError, + DatasetBuilderError, ) from cdisc_rules_engine.interfaces import ( CacheServiceInterface, @@ -304,7 +305,15 @@ def validate_rule( """ kwargs = {} builder = self.get_dataset_builder(rule, datasets, dataset_metadata) - dataset = builder.get_dataset() + try: + dataset = builder.get_dataset() + except Exception as e: + raise DatasetBuilderError( + f"Failed to build dataset for rule validation. " + f"Builder: {builder.__class__.__name__}, " + f"Dataset: {dataset_metadata.name}, " + f"Error: {str(e)}" + ) # Update rule for certain rule types # SPECIAL CASES FOR RULE TYPES ############################### # TODO: Handle these special cases better. @@ -521,10 +530,10 @@ def handle_validation_exceptions( # noqa elif isinstance(exception, PreprocessingError): error_obj = FailedValidationEntity( dataset=os.path.basename(dataset_path), - error="Preprocessing failed", - message=str(exception), # All structured info in the message + error=PreprocessingError.description, + message=str(exception), ) - message = "rule evaluation skipped - preprocessing failed" + message = "rule evaluation error - preprocessing failed" errors = [error_obj] return ValidationErrorContainer( dataset=os.path.basename(dataset_path), @@ -536,10 +545,10 @@ def handle_validation_exceptions( # noqa elif isinstance(exception, OperationError): error_obj = FailedValidationEntity( dataset=os.path.basename(dataset_path), - error="Operation execution failed", - message=str(exception), # All structured info in the message + error=OperationError.description, + message=str(exception), ) - message = "rule evaluation skipped - operation failed" + message = "rule evaluation error - operation failed" errors = [error_obj] return ValidationErrorContainer( dataset=os.path.basename(dataset_path), @@ -547,6 +556,19 @@ def handle_validation_exceptions( # noqa message=message, status=ExecutionStatus.SKIPPED.value, ) + elif isinstance(exception, DatasetBuilderError): + error_obj = FailedValidationEntity( + dataset=os.path.basename(dataset_path), + error=DatasetBuilderError.description, + message=str(exception), + ) + message = "rule evaluation error - evaluation dataset failed to build" + return ValidationErrorContainer( + dataset=os.path.basename(dataset_path), + errors=errors, + message=message, + status=ExecutionStatus.SKIPPED.value, + ) elif isinstance(exception, FailedSchemaValidation): if self.validate_xml: error_obj = FailedValidationEntity( diff --git a/cdisc_rules_engine/utilities/dataset_preprocessor.py b/cdisc_rules_engine/utilities/dataset_preprocessor.py index 36a614014..c9e4752c3 100644 --- a/cdisc_rules_engine/utilities/dataset_preprocessor.py +++ b/cdisc_rules_engine/utilities/dataset_preprocessor.py @@ -107,7 +107,7 @@ def preprocess( # noqa if not file_infos: raise PreprocessingError( - f"Required dataset '{domain_name}' not found for {self._dataset_metadata.name}" + f"Failed to find related dataset for '{domain_name}' in preprocessor" ) for file_info in file_infos: @@ -121,7 +121,7 @@ def preprocess( # noqa other_dataset: DatasetInterface = self._download_dataset(filename) except Exception as e: raise PreprocessingError( - f"Failed to load dataset '{filename}': {str(e)}" + f"Failed to download dataset '{filename}' for preprocessing: {str(e)}" ) referenced_targets = set( @@ -158,26 +158,35 @@ def preprocess( # noqa right_dataset_domain_details=domain_details, datasets=datasets, ) + merged_domains.add(file_info.domain) return result def _find_parent_dataset( self, datasets: Iterable[SDTMDatasetMetadata], domain_details: dict ) -> SDTMDatasetMetadata: matching_datasets = [] - if "RDOMAIN" in self._dataset.columns: - rdomain_column = self._dataset.data["RDOMAIN"] - unique_domains = set(rdomain_column.unique()) - for dataset in datasets: - if dataset.domain in unique_domains: - matching_datasets.append(dataset) - else: - match_keys = domain_details.get("match_key") - for dataset in datasets: - has_all_match_keys = all( - match_key in dataset.first_record for match_key in match_keys - ) - if has_all_match_keys: - matching_datasets.append(dataset) + try: + if "RDOMAIN" in self._dataset.columns: + rdomain_column = self._dataset.data["RDOMAIN"] + unique_domains = set(rdomain_column.unique()) + for dataset in datasets: + if dataset.domain in unique_domains: + matching_datasets.append(dataset) + else: + match_keys = domain_details.get("match_key") + for dataset in datasets: + has_all_match_keys = all( + match_key in dataset.first_record for match_key in match_keys + ) + if has_all_match_keys: + matching_datasets.append(dataset) + except Exception as e: + raise PreprocessingError( + f"Error during parent dataset search. " + f"Current dataset: {self._dataset_metadata.name}, " + f"Match keys: {domain_details.get('match_key')}, " + f"Error: {str(e)}" + ) if not matching_datasets: logger.warning( f"Child specified in match but no parent datasets found for: {domain_details}" @@ -226,14 +235,22 @@ def _child_merge_datasets( "--", right_dataset_domain_name, ) - result = left_dataset.merge( - right_dataset.data, - how="left", - left_on=left_dataset_match_keys, - right_on=right_dataset_match_keys, - suffixes=("", f".{right_dataset_domain_name}"), - ) - return result + try: + result = left_dataset.merge( + right_dataset.data, + how="left", + left_on=left_dataset_match_keys, + right_on=right_dataset_match_keys, + suffixes=("", f".{right_dataset_domain_name}"), + ) + return result + except Exception as e: + raise PreprocessingError( + f"Merge operation failed during child merge. " + f"Left dataset: {left_dataset_domain_name}, " + f"Right dataset: {right_dataset_domain_name}, " + f"Error: {str(e)}" + ) def _classify_dataset( self, dataset: DatasetInterface, metadata: SDTMDatasetMetadata @@ -251,19 +268,28 @@ def _merge_rdomain_dataset( right_dataset_domain_name: str, match_keys: List[str], ) -> DatasetInterface: - relevant_child_records = self._get_relevant_child_records( - left_dataset, right_dataset_domain_name - ) - merged_records = self._merge_with_idvar_logic( - relevant_child_records, - right_dataset, - left_dataset_domain_name, - right_dataset_domain_name, - match_keys, - ) - return self._update_dataset_with_merged_records( - left_dataset, relevant_child_records, merged_records - ) + try: + relevant_child_records = self._get_relevant_child_records( + left_dataset, right_dataset_domain_name + ) + merged_records = self._merge_with_idvar_logic( + relevant_child_records, + right_dataset, + left_dataset_domain_name, + right_dataset_domain_name, + match_keys, + ) + return self._update_dataset_with_merged_records( + left_dataset, relevant_child_records, merged_records + ) + except Exception as e: + raise PreprocessingError( + f"Failed to merge with IDVAR logic. " + f"Left dataset: {left_dataset_domain_name}, " + f"Right dataset: {right_dataset_domain_name}, " + f"Match keys: {match_keys}, " + f"Error: {str(e)}" + ) def _get_relevant_child_records( self, left_dataset: DatasetInterface, parent_domain: str @@ -469,7 +495,7 @@ def _update_dataset_with_merged_records( updated_data = pd.concat([remaining_records, merged_records], ignore_index=True) return self._dataset.__class__(data=updated_data) - def _merge_datasets( + def _merge_datasets( # noqa self, left_dataset: DatasetInterface, left_dataset_domain_name: str, @@ -499,37 +525,70 @@ def _merge_datasets( # merge datasets based on their type if right_dataset_domain_name == "RELREC": - result: DatasetInterface = DataProcessor.merge_relrec_datasets( - left_dataset=left_dataset, - left_dataset_domain_name=left_dataset_domain_name, - relrec_dataset=right_dataset, - datasets=datasets, - dataset_preprocessor=self, - wildcard=right_dataset_domain_details.get("wildcard"), - ) + try: + result: DatasetInterface = DataProcessor.merge_relrec_datasets( + left_dataset=left_dataset, + left_dataset_domain_name=left_dataset_domain_name, + relrec_dataset=right_dataset, + datasets=datasets, + dataset_preprocessor=self, + wildcard=right_dataset_domain_details.get("wildcard"), + ) + except Exception as e: + raise PreprocessingError( + f"Failed to merge RELREC dataset in preprocessing. " + f"Left dataset: {left_dataset_domain_name}, " + f"RELREC dataset: {right_dataset_domain_name}, " + f"Wildcard: {right_dataset_domain_details.get('wildcard')}, " + f"Match keys: {match_keys}, " + f"Error: {str(e)}" + ) elif right_dataset_domain_name == "SUPP--": - result: DatasetInterface = DataProcessor.merge_pivot_supp_dataset( - dataset_implementation=self._data_service.dataset_implementation, - left_dataset=left_dataset, - right_dataset=right_dataset, - ) + try: + result: DatasetInterface = DataProcessor.merge_pivot_supp_dataset( + dataset_implementation=self._data_service.dataset_implementation, + left_dataset=left_dataset, + right_dataset=right_dataset, + ) + except Exception as e: + raise PreprocessingError( + f"Failed to merge supplemental/qualifier dataset. " + f"Left dataset: {left_dataset_domain_name} ({len(left_dataset)} rows), " + f"SUPP/SQ dataset: {right_dataset_domain_name} ({len(right_dataset)} rows), " + f"Error: {str(e)}" + ) elif self._rule_processor.is_relationship_dataset(right_dataset_domain_name): - result: DatasetInterface = DataProcessor.merge_relationship_datasets( - left_dataset=left_dataset, - left_dataset_match_keys=left_dataset_match_keys, - right_dataset=right_dataset, - right_dataset_match_keys=right_dataset_match_keys, - right_dataset_domain=right_dataset_domain_details, - ) + try: + result: DatasetInterface = DataProcessor.merge_relationship_datasets( + left_dataset=left_dataset, + left_dataset_match_keys=left_dataset_match_keys, + right_dataset=right_dataset, + right_dataset_match_keys=right_dataset_match_keys, + right_dataset_domain=right_dataset_domain_details, + ) + except Exception as e: + raise PreprocessingError( + f"Failed to merge relationship dataset. " + f"Left dataset: {left_dataset_domain_name} ({len(left_dataset)} rows), " + f"SUPP/SQ dataset: {right_dataset_domain_name} ({len(right_dataset)} rows), " + f"Error: {str(e)}" + ) else: - result: DatasetInterface = DataProcessor.merge_sdtm_datasets( - left_dataset=left_dataset, - right_dataset=right_dataset, - left_dataset_match_keys=left_dataset_match_keys, - right_dataset_match_keys=right_dataset_match_keys, - right_dataset_domain_name=right_dataset_domain_name, - join_type=JoinTypes( - right_dataset_domain_details.get("join_type", "inner") - ), - ) + try: + result: DatasetInterface = DataProcessor.merge_sdtm_datasets( + left_dataset=left_dataset, + right_dataset=right_dataset, + left_dataset_match_keys=left_dataset_match_keys, + right_dataset_match_keys=right_dataset_match_keys, + right_dataset_domain_name=right_dataset_domain_name, + join_type=JoinTypes( + right_dataset_domain_details.get("join_type", "inner") + ), + ) + except Exception as e: + raise PreprocessingError( + f"Failed to merge datasets. " + f"Left dataset: {left_dataset_domain_name}, Right dataset: {right_dataset_domain_name}, " + f"Error: {str(e)}" + ) return result diff --git a/cdisc_rules_engine/utilities/rule_processor.py b/cdisc_rules_engine/utilities/rule_processor.py index cc21d7ad8..c78236484 100644 --- a/cdisc_rules_engine/utilities/rule_processor.py +++ b/cdisc_rules_engine/utilities/rule_processor.py @@ -1,4 +1,7 @@ import re +import copy +import os + from typing import Iterable, List, Optional, Set, Union, Tuple from cdisc_rules_engine.enums.rule_types import RuleTypes from cdisc_rules_engine.interfaces.cache_service_interface import ( @@ -11,9 +14,6 @@ from cdisc_rules_engine.models.library_metadata_container import ( LibraryMetadataContainer, ) - -import copy -import os from cdisc_rules_engine.constants.classes import ( FINDINGS_ABOUT, FINDINGS, @@ -28,6 +28,7 @@ from cdisc_rules_engine.interfaces import ConditionInterface from cdisc_rules_engine.models.operation_params import OperationParams from cdisc_rules_engine.models.rule_conditions import AllowedConditionsKeys +from cdisc_rules_engine.exceptions.custom_exceptions import OperationError from cdisc_rules_engine.operations import operations_factory from cdisc_rules_engine.services import logger from cdisc_rules_engine.utilities.data_processor import DataProcessor @@ -424,11 +425,18 @@ def perform_rule_operations( value_is_reference=operation.get("value_is_reference", False), delimiter=operation.get("delimiter"), ) - - # execute operation - dataset_copy = self._execute_operation( - operation_params, dataset_copy, previous_operations - ) + try: + # execute operation + dataset_copy = self._execute_operation( + operation_params, dataset_copy, previous_operations + ) + except Exception as e: + raise OperationError( + f"Failed to execute rule operation. " + f"Operation: {operation_params.operation_name}, " + f"Target: {target}, Domain: {domain}, " + f"Error: {str(e)}" + ) previous_operations.append(operation_params.operation_name) logger.info( From 39e0f5621e337c28329c920f900fea2d789dc541 Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Thu, 20 Nov 2025 15:16:55 -0500 Subject: [PATCH 4/5] fix --- cdisc_rules_engine/rules_engine.py | 1 + tests/unit/test_rules_engine.py | 10 ++++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/cdisc_rules_engine/rules_engine.py b/cdisc_rules_engine/rules_engine.py index 4e8c992da..90253697b 100644 --- a/cdisc_rules_engine/rules_engine.py +++ b/cdisc_rules_engine/rules_engine.py @@ -563,6 +563,7 @@ def handle_validation_exceptions( # noqa message=str(exception), ) message = "rule evaluation error - evaluation dataset failed to build" + errors = [error_obj] return ValidationErrorContainer( dataset=os.path.basename(dataset_path), errors=errors, diff --git a/tests/unit/test_rules_engine.py b/tests/unit/test_rules_engine.py index 24183c0ff..06eb51198 100644 --- a/tests/unit/test_rules_engine.py +++ b/tests/unit/test_rules_engine.py @@ -1292,16 +1292,18 @@ def test_validate_single_dataset_not_equal_to( ), [ { - "executionStatus": "execution_error", + "executionStatus": "skipped", "dataset": "ae.xpt", "domain": "AE", "variables": [], - "message": "rule execution error", + "message": "rule evaluation error - evaluation dataset failed to build", "errors": [ { "dataset": "ae.xpt", - "error": "An unknown exception has occurred", - "message": "single positional indexer is out-of-bounds", + "error": "Error occurred during dataset building", + "message": "Failed to build dataset for rule validation. Builder: " + "DatasetMetadataDefineDatasetBuilder, Dataset: AE, " + "Error: single positional indexer is out-of-bounds", } ], } From 761427497e8338ac376a8de0f1d3b9ad4a0652a1 Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Mon, 24 Nov 2025 15:35:31 -0500 Subject: [PATCH 5/5] update preprocessor --- .../utilities/dataset_preprocessor.py | 40 ++++--------------- 1 file changed, 7 insertions(+), 33 deletions(-) diff --git a/cdisc_rules_engine/utilities/dataset_preprocessor.py b/cdisc_rules_engine/utilities/dataset_preprocessor.py index 79f309623..f1ebc647c 100644 --- a/cdisc_rules_engine/utilities/dataset_preprocessor.py +++ b/cdisc_rules_engine/utilities/dataset_preprocessor.py @@ -50,6 +50,10 @@ def __init__( def preprocess( # noqa self, rule: dict, datasets: Iterable[SDTMDatasetMetadata] ) -> DatasetInterface: + """ + Preprocesses the dataset by merging it with the + datasets from the provided rule. + """ rule_datasets: List[dict] = rule.get("datasets") if not rule_datasets: return self._dataset # nothing to preprocess @@ -543,7 +547,9 @@ def _merge_datasets( # noqa f"Match keys: {match_keys}, " f"Error: {str(e)}" ) - elif right_dataset_domain_name == "SUPP--": + elif right_dataset_domain_name.startswith( + "SUPP" + ) or right_dataset_domain_name.startswith("SQ"): try: result: DatasetInterface = DataProcessor.merge_pivot_supp_dataset( dataset_implementation=self._data_service.dataset_implementation, @@ -557,38 +563,6 @@ def _merge_datasets( # noqa f"SUPP/SQ dataset: {right_dataset_domain_name} ({len(right_dataset)} rows), " f"Error: {str(e)}" ) - elif self._rule_processor.is_relationship_dataset(right_dataset_domain_name): - try: - result: DatasetInterface = DataProcessor.merge_relationship_datasets( - left_dataset=left_dataset, - left_dataset_match_keys=left_dataset_match_keys, - right_dataset=right_dataset, - right_dataset_match_keys=right_dataset_match_keys, - right_dataset_domain=right_dataset_domain_details, - ) - except Exception as e: - raise PreprocessingError( - f"Failed to merge relationship dataset. " - f"Left dataset: {left_dataset_domain_name} ({len(left_dataset)} rows), " - f"SUPP/SQ dataset: {right_dataset_domain_name} ({len(right_dataset)} rows), " - f"Error: {str(e)}" - ) - result: DatasetInterface = DataProcessor.merge_relrec_datasets( - left_dataset=left_dataset, - left_dataset_domain_name=left_dataset_domain_name, - relrec_dataset=right_dataset, - datasets=datasets, - dataset_preprocessor=self, - wildcard=right_dataset_domain_details.get("wildcard"), - ) - elif right_dataset_domain_name.startswith( - "SUPP" - ) or right_dataset_domain_name.startswith("SQ"): - result = DataProcessor.merge_pivot_supp_dataset( - dataset_implementation=self._data_service.dataset_implementation, - left_dataset=left_dataset, - right_dataset=right_dataset, - ) else: try: result: DatasetInterface = DataProcessor.merge_sdtm_datasets(