From c8d441b45952688818927fe6a2b1174c95c41ef0 Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Thu, 13 Nov 2025 15:01:47 -0500 Subject: [PATCH 1/9] draft --- .../utilities/data_processor.py | 101 +++++++++++++----- 1 file changed, 76 insertions(+), 25 deletions(-) diff --git a/cdisc_rules_engine/utilities/data_processor.py b/cdisc_rules_engine/utilities/data_processor.py index 7d871b562..002744a4a 100644 --- a/cdisc_rules_engine/utilities/data_processor.py +++ b/cdisc_rules_engine/utilities/data_processor.py @@ -390,38 +390,89 @@ def merge_pivot_supp_dataset( static_keys = ["STUDYID", "USUBJID", "APID", "POOLID", "SPDEVID"] qnam_list = right_dataset["QNAM"].unique() right_dataset = DataProcessor.process_supp(right_dataset) - dynamic_key = right_dataset["IDVAR"].iloc[0] + unique_idvar_values = right_dataset["IDVAR"].unique() + if len(unique_idvar_values) == 1: + dynamic_key = right_dataset["IDVAR"].iloc[0] + # Determine the common keys present in both datasets + common_keys = [ + key + for key in static_keys + if key in left_dataset.columns and key in right_dataset.columns + ] + common_keys.append(dynamic_key) + current_supp = right_dataset.rename(columns={"IDVARVAL": dynamic_key}) + current_supp = current_supp.drop(columns=["IDVAR"]) + left_dataset[dynamic_key] = left_dataset[dynamic_key].astype(str) + current_supp[dynamic_key] = current_supp[dynamic_key].astype(str) + left_dataset = PandasDataset( + pd.merge( + left_dataset.data, + current_supp.data, + how="left", + on=common_keys, + suffixes=("", "_supp"), + ) + ) + for qnam in qnam_list: + qnam_check = left_dataset.data.dropna(subset=[qnam]) + grouped = qnam_check.groupby(common_keys).size() + if (grouped > 1).any(): + raise ValueError( + f"Multiple records with the same QNAM '{qnam}' match a single parent record" + ) + else: + left_dataset = DataProcessor._merge_supp_with_multiple_idvars( + left_dataset, right_dataset, static_keys, qnam_list + ) + if dataset_implementation == DaskDataset: + left_dataset = DaskDataset(left_dataset.data) + return left_dataset - # Determine the common keys present in both datasets - common_keys = [ - key - for key in static_keys - if key in left_dataset.columns and key in right_dataset.columns - ] - common_keys.append(dynamic_key) - current_supp = right_dataset.rename(columns={"IDVARVAL": dynamic_key}) - current_supp = current_supp.drop(columns=["IDVAR"]) - left_dataset[dynamic_key] = left_dataset[dynamic_key].astype(str) - current_supp[dynamic_key] = current_supp[dynamic_key].astype(str) - left_dataset = PandasDataset( - pd.merge( - left_dataset.data, - current_supp.data, - how="left", - on=common_keys, - suffixes=("", "_supp"), + @staticmethod + def _merge_supp_with_multiple_idvars( + left_dataset: DatasetInterface, + right_dataset: DatasetInterface, + static_keys: List[str], + qnam_list: list, + ) -> DatasetInterface: + idvar_groups = right_dataset.groupby("IDVAR") + result_dataset = left_dataset + for idvar_value, group_data in idvar_groups: + common_keys = [ + key + for key in static_keys + if key in result_dataset.columns and key in group_data.columns + ] + common_keys.append(idvar_value) + current_supp = group_data.rename(columns={"IDVARVAL": idvar_value}) + current_supp = current_supp.drop(columns=["IDVAR"]) + result_dataset[idvar_value] = result_dataset[idvar_value].astype(str) + current_supp[idvar_value] = current_supp[idvar_value].astype(str) + result_dataset = PandasDataset( + pd.merge( + result_dataset.data, + current_supp.data, + how="left", + on=common_keys, + suffixes=("", "_supp"), + ) ) - ) for qnam in qnam_list: - qnam_check = left_dataset.data.dropna(subset=[qnam]) - grouped = qnam_check.groupby(common_keys).size() + qnam_check = result_dataset.data.dropna(subset=[qnam]) + validation_keys = [ + key for key in static_keys if key in result_dataset.columns + ] + for idvar_value in right_dataset["IDVAR"].unique(): + if idvar_value in result_dataset.columns: + validation_keys.append(idvar_value) + + grouped = qnam_check.groupby(validation_keys).size() if (grouped > 1).any(): raise ValueError( f"Multiple records with the same QNAM '{qnam}' match a single parent record" ) - if dataset_implementation == DaskDataset: - left_dataset = DaskDataset(left_dataset.data) - return left_dataset + + return result_dataset @staticmethod def process_supp(supp_dataset): From 0da72e7ded9c3f6dfffa813bd695e3edac655061 Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Fri, 14 Nov 2025 09:04:45 -0500 Subject: [PATCH 2/9] iterative multi idvar bug fix --- .../utilities/data_processor.py | 91 ++++++++++++++++--- .../utilities/dataset_preprocessor.py | 2 +- 2 files changed, 81 insertions(+), 12 deletions(-) diff --git a/cdisc_rules_engine/utilities/data_processor.py b/cdisc_rules_engine/utilities/data_processor.py index 002744a4a..66490f7b0 100644 --- a/cdisc_rules_engine/utilities/data_processor.py +++ b/cdisc_rules_engine/utilities/data_processor.py @@ -389,9 +389,9 @@ def merge_pivot_supp_dataset( static_keys = ["STUDYID", "USUBJID", "APID", "POOLID", "SPDEVID"] qnam_list = right_dataset["QNAM"].unique() - right_dataset = DataProcessor.process_supp(right_dataset) unique_idvar_values = right_dataset["IDVAR"].unique() if len(unique_idvar_values) == 1: + right_dataset = DataProcessor.process_supp(right_dataset) dynamic_key = right_dataset["IDVAR"].iloc[0] # Determine the common keys present in both datasets common_keys = [ @@ -435,39 +435,108 @@ def _merge_supp_with_multiple_idvars( static_keys: List[str], qnam_list: list, ) -> DatasetInterface: - idvar_groups = right_dataset.groupby("IDVAR") result_dataset = left_dataset - for idvar_value, group_data in idvar_groups: + + # Process each IDVAR group separately + for idvar_value in right_dataset["IDVAR"].unique(): + # Filter to just this IDVAR group + group_data = right_dataset.data[ + right_dataset.data["IDVAR"] == idvar_value + ].copy() + + print(f"\n=== Processing IDVAR: {idvar_value} ===") + print(f"Group has {len(group_data)} rows") + + # Pivot QNAM/QVAL into columns (mimicking process_supp for this group) + group_qnam_list = group_data["QNAM"].unique() + for qnam in group_qnam_list: + group_data[qnam] = pd.NA + + for index, row in group_data.iterrows(): + group_data.at[index, row["QNAM"]] = row["QVAL"] + + # NOW drop the columns (fixing the process_supp bug) + group_data = group_data.drop(columns=["QNAM", "QVAL", "QLABEL", "IDVAR"]) + + # Rename IDVARVAL to the actual merge key + group_data = group_data.rename(columns={"IDVARVAL": idvar_value}) + + print(f"Unique QNAM columns: {list(group_qnam_list)}") + + # Determine merge keys (same as single IDVAR case) common_keys = [ key for key in static_keys if key in result_dataset.columns and key in group_data.columns ] common_keys.append(idvar_value) - current_supp = group_data.rename(columns={"IDVARVAL": idvar_value}) - current_supp = current_supp.drop(columns=["IDVAR"]) + + # Handle multiple rows with same merge key (e.g., ECENDY=7 with ECLOC + ECSITE) + # Aggregate: take first non-null value for each column + agg_dict = { + col: lambda x: x.dropna().iloc[0] if not x.dropna().empty else pd.NA + for col in group_data.columns + if col not in common_keys + } + + group_data = group_data.groupby( + common_keys, as_index=False, dropna=False + ).agg(agg_dict) + + print(f"After aggregation: {len(group_data)} unique rows") + + # Drop columns that already exist in result (except merge keys) + # This prevents RDOMAIN_supp, QORIG_supp, QEVAL_supp duplicates + cols_to_drop = [ + col + for col in group_data.columns + if col in result_dataset.columns and col not in common_keys + ] + + if cols_to_drop: + print(f"Dropping already-present columns: {cols_to_drop}") + group_data = group_data.drop(columns=cols_to_drop) + + # Type conversion result_dataset[idvar_value] = result_dataset[idvar_value].astype(str) - current_supp[idvar_value] = current_supp[idvar_value].astype(str) + group_data[idvar_value] = group_data[idvar_value].astype(str) + + # Merge (exactly like single IDVAR case) result_dataset = PandasDataset( pd.merge( result_dataset.data, - current_supp.data, + group_data, how="left", on=common_keys, suffixes=("", "_supp"), ) ) + + # Validation logic for qnam in qnam_list: + if qnam not in result_dataset.columns: + continue + qnam_check = result_dataset.data.dropna(subset=[qnam]) + if len(qnam_check) == 0: + continue + + # Find the merge keys for this QNAM + idvar_for_qnam = right_dataset.data[right_dataset.data["QNAM"] == qnam][ + "IDVAR" + ].iloc[0] + validation_keys = [ key for key in static_keys if key in result_dataset.columns ] - for idvar_value in right_dataset["IDVAR"].unique(): - if idvar_value in result_dataset.columns: - validation_keys.append(idvar_value) + validation_keys.append(idvar_for_qnam) + print(f"\nValidating QNAM '{qnam}' with keys: {validation_keys}") grouped = qnam_check.groupby(validation_keys).size() + if (grouped > 1).any(): + print(f"DUPLICATES FOUND for {qnam}:") + print(grouped[grouped > 1]) raise ValueError( f"Multiple records with the same QNAM '{qnam}' match a single parent record" ) @@ -483,7 +552,7 @@ def process_supp(supp_dataset): # Set the value of the new columns only in their respective rows for index, row in supp_dataset.iterrows(): supp_dataset.at[index, row["QNAM"]] = row["QVAL"] - supp_dataset.drop(labels=["QNAM", "QVAL", "QLABEL"], axis=1) + supp_dataset = supp_dataset.drop(labels=["QNAM", "QVAL", "QLABEL"], axis=1) return supp_dataset @staticmethod diff --git a/cdisc_rules_engine/utilities/dataset_preprocessor.py b/cdisc_rules_engine/utilities/dataset_preprocessor.py index aca3dead9..b054a9d72 100644 --- a/cdisc_rules_engine/utilities/dataset_preprocessor.py +++ b/cdisc_rules_engine/utilities/dataset_preprocessor.py @@ -495,7 +495,7 @@ def _merge_datasets( dataset_preprocessor=self, wildcard=right_dataset_domain_details.get("wildcard"), ) - elif right_dataset_domain_name == "SUPP--": + elif right_dataset_domain_name.startswith("SUPP"): result: DatasetInterface = DataProcessor.merge_pivot_supp_dataset( dataset_implementation=self._data_service.dataset_implementation, left_dataset=left_dataset, From fb428731b73716a6f340a79ffc33e078b2357b7b Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Fri, 14 Nov 2025 14:47:14 -0500 Subject: [PATCH 3/9] tests --- .../utilities/data_processor.py | 5 - tests/unit/test_merge_supp_datasets.py | 173 ++++++++++++++++++ 2 files changed, 173 insertions(+), 5 deletions(-) diff --git a/cdisc_rules_engine/utilities/data_processor.py b/cdisc_rules_engine/utilities/data_processor.py index 66490f7b0..f6b84786b 100644 --- a/cdisc_rules_engine/utilities/data_processor.py +++ b/cdisc_rules_engine/utilities/data_processor.py @@ -439,14 +439,9 @@ def _merge_supp_with_multiple_idvars( # Process each IDVAR group separately for idvar_value in right_dataset["IDVAR"].unique(): - # Filter to just this IDVAR group group_data = right_dataset.data[ right_dataset.data["IDVAR"] == idvar_value ].copy() - - print(f"\n=== Processing IDVAR: {idvar_value} ===") - print(f"Group has {len(group_data)} rows") - # Pivot QNAM/QVAL into columns (mimicking process_supp for this group) group_qnam_list = group_data["QNAM"].unique() for qnam in group_qnam_list: diff --git a/tests/unit/test_merge_supp_datasets.py b/tests/unit/test_merge_supp_datasets.py index 8ab6e026a..370bdb728 100644 --- a/tests/unit/test_merge_supp_datasets.py +++ b/tests/unit/test_merge_supp_datasets.py @@ -123,3 +123,176 @@ def dummy_func(dataset_name, **kwargs): ) assert len(merged_dataset.data) == len(parent_dataset.data), "The" " length of the merged dataset should match the parent dataset." + + +@patch.object(LocalDataService, "check_filepath", return_value=False) +@patch.object(LocalDataService, "_async_get_datasets") +def test_merge_supp_dataset_multi_idvar(mock_async_get_datasets, data_service): + parent_dataset = PandasDataset( + pd.DataFrame( + { + "STUDYID": ["STUDY1", "STUDY1", "STUDY1", "STUDY1"], + "USUBJID": ["001", "001", "002", "002"], + "ECSEQ": ["1", "2", "1", "2"], + "ECENDY": ["5", "7", "5", "7"], + "ECTRT": ["Treatment A", "Treatment B", "Treatment A", "Treatment B"], + } + ) + ) + + supp_dataset = PandasDataset( + pd.DataFrame( + { + "STUDYID": ["STUDY1", "STUDY1", "STUDY1", "STUDY1", "STUDY1", "STUDY1"], + "USUBJID": ["001", "001", "001", "001", "002", "002"], + "RDOMAIN": ["EC", "EC", "EC", "EC", "EC", "EC"], + "IDVAR": ["ECSEQ", "ECSEQ", "ECENDY", "ECENDY", "ECSEQ", "ECENDY"], + "IDVARVAL": ["1", "2", "7", "7", "1", "5"], + "QNAM": ["ECLOC", "ECLOC", "ECSITE", "ECREGION", "ECLOC", "ECSITE"], + "QVAL": [ + "Left Arm", + "Right Arm", + "Site A", + "Region 1", + "Left Leg", + "Site B", + ], + "QLABEL": [ + "Location", + "Location", + "Site", + "Region", + "Location", + "Site", + ], + "QORIG": ["CRF", "CRF", "CRF", "CRF", "CRF", "CRF"], + "QEVAL": ["", "", "", "", "", ""], + } + ) + ) + + mock_async_get_datasets.return_value = [parent_dataset, supp_dataset] + merged_dataset = DataProcessor.merge_pivot_supp_dataset( + data_service.dataset_implementation, parent_dataset, supp_dataset + ) + assert "ECLOC" in merged_dataset.columns, "ECLOC column should be added from SUPP" + assert "ECSITE" in merged_dataset.columns, "ECSITE column should be added from SUPP" + assert ( + "ECREGION" in merged_dataset.columns + ), "ECREGION column should be added from SUPP" + + row1 = merged_dataset.data[ + (merged_dataset.data["USUBJID"] == "001") + & (merged_dataset.data["ECSEQ"] == "1") + ].iloc[0] + assert row1["ECLOC"] == "Left Arm", "ECLOC should match ECSEQ=1" + assert pd.isna(row1["ECSITE"]), "ECSITE should be NaN for ECSEQ=1" + + row2 = merged_dataset.data[ + (merged_dataset.data["USUBJID"] == "001") + & (merged_dataset.data["ECSEQ"] == "2") + ].iloc[0] + assert row2["ECLOC"] == "Right Arm", "ECLOC should match ECSEQ=2" + assert row2["ECSITE"] == "Site A", "ECSITE should match ECENDY=7" + assert row2["ECREGION"] == "Region 1", "ECREGION should match ECENDY=7" + + row3 = merged_dataset.data[ + (merged_dataset.data["USUBJID"] == "002") + & (merged_dataset.data["ECSEQ"] == "1") + ].iloc[0] + assert row3["ECLOC"] == "Left Leg", "ECLOC should match ECSEQ=1" + assert row3["ECSITE"] == "Site B", "ECSITE should match ECENDY=5" + assert len(merged_dataset.data) == len( + parent_dataset.data + ), "Merged dataset should have same number of rows as parent" + + +@patch.object(LocalDataService, "check_filepath", return_value=False) +@patch.object(LocalDataService, "_async_get_datasets") +def test_merge_supp_dataset_multi_idvar_aggregation( + mock_async_get_datasets, data_service +): + parent_dataset = PandasDataset( + pd.DataFrame( + { + "STUDYID": ["STUDY1", "STUDY1"], + "USUBJID": ["001", "001"], + "ECSEQ": ["1", "2"], + "ECENDY": ["5", "7"], + "ECTRT": ["Treatment A", "Treatment B"], + } + ) + ) + supp_dataset = PandasDataset( + pd.DataFrame( + { + "STUDYID": ["STUDY1", "STUDY1", "STUDY1"], + "USUBJID": ["001", "001", "001"], + "RDOMAIN": ["EC", "EC", "EC"], + "IDVAR": ["ECENDY", "ECENDY", "ECSEQ"], + "IDVARVAL": ["7", "7", "1"], + "QNAM": ["ECLOC", "ECSITE", "ECLOC"], + "QVAL": ["Location A", "Site A", "Location B"], + "QLABEL": ["Location", "Site", "Location"], + "QORIG": ["CRF", "CRF", "CRF"], + "QEVAL": ["", "", ""], + } + ) + ) + + mock_async_get_datasets.return_value = [parent_dataset, supp_dataset] + + merged_dataset = DataProcessor.merge_pivot_supp_dataset( + data_service.dataset_implementation, parent_dataset, supp_dataset + ) + + assert len(merged_dataset.data) == len( + parent_dataset.data + ), "Should not create duplicate rows when aggregating multiple QNAM for same IDVAR/IDVARVAL" + + # Both ECLOC and ECSITE from ECENDY=7 should be present in the ECSEQ=2 row + row = merged_dataset.data[merged_dataset.data["ECSEQ"] == "2"].iloc[0] + assert row["ECLOC"] == "Location A", "ECLOC from ECENDY=7 should be merged" + assert row["ECSITE"] == "Site A", "ECSITE from ECENDY=7 should be merged" + + +@patch.object(LocalDataService, "check_filepath", return_value=False) +@patch.object(LocalDataService, "_async_get_datasets") +def test_merge_supp_dataset_multi_idvar_same_qnam_validation_error( + mock_async_get_datasets, data_service +): + parent_dataset = PandasDataset( + pd.DataFrame( + { + "STUDYID": ["STUDY1"], + "USUBJID": ["001"], + "ECSEQ": ["1"], + "ECENDY": ["5"], + "ECTRT": ["Treatment A"], + } + ) + ) + + supp_dataset = PandasDataset( + pd.DataFrame( + { + "STUDYID": ["STUDY1", "STUDY1"], + "USUBJID": ["001", "001"], + "RDOMAIN": ["EC", "EC"], + "IDVAR": ["ECSEQ", "ECSEQ"], + "IDVARVAL": ["1", "1"], + "QNAM": ["ECLOC", "ECLOC"], + "QVAL": ["Location A", "Location B"], + "QLABEL": ["Location", "Location"], + "QORIG": ["CRF", "CRF"], + "QEVAL": ["", ""], + } + ) + ) + + mock_async_get_datasets.return_value = [parent_dataset, supp_dataset] + + with pytest.raises(ValueError, match="Multiple records with the same QNAM"): + DataProcessor.merge_pivot_supp_dataset( + data_service.dataset_implementation, parent_dataset, supp_dataset + ) From 5ea476d3def966627a019f4ccc3a5022e8bded41 Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Fri, 14 Nov 2025 14:57:57 -0500 Subject: [PATCH 4/9] cleanup --- .../utilities/data_processor.py | 35 ------------------- 1 file changed, 35 deletions(-) diff --git a/cdisc_rules_engine/utilities/data_processor.py b/cdisc_rules_engine/utilities/data_processor.py index f6b84786b..c5cb67573 100644 --- a/cdisc_rules_engine/utilities/data_processor.py +++ b/cdisc_rules_engine/utilities/data_processor.py @@ -442,23 +442,13 @@ def _merge_supp_with_multiple_idvars( group_data = right_dataset.data[ right_dataset.data["IDVAR"] == idvar_value ].copy() - # Pivot QNAM/QVAL into columns (mimicking process_supp for this group) group_qnam_list = group_data["QNAM"].unique() for qnam in group_qnam_list: group_data[qnam] = pd.NA - for index, row in group_data.iterrows(): group_data.at[index, row["QNAM"]] = row["QVAL"] - - # NOW drop the columns (fixing the process_supp bug) group_data = group_data.drop(columns=["QNAM", "QVAL", "QLABEL", "IDVAR"]) - - # Rename IDVARVAL to the actual merge key group_data = group_data.rename(columns={"IDVARVAL": idvar_value}) - - print(f"Unique QNAM columns: {list(group_qnam_list)}") - - # Determine merge keys (same as single IDVAR case) common_keys = [ key for key in static_keys @@ -466,37 +456,24 @@ def _merge_supp_with_multiple_idvars( ] common_keys.append(idvar_value) - # Handle multiple rows with same merge key (e.g., ECENDY=7 with ECLOC + ECSITE) - # Aggregate: take first non-null value for each column agg_dict = { col: lambda x: x.dropna().iloc[0] if not x.dropna().empty else pd.NA for col in group_data.columns if col not in common_keys } - group_data = group_data.groupby( common_keys, as_index=False, dropna=False ).agg(agg_dict) - - print(f"After aggregation: {len(group_data)} unique rows") - - # Drop columns that already exist in result (except merge keys) - # This prevents RDOMAIN_supp, QORIG_supp, QEVAL_supp duplicates cols_to_drop = [ col for col in group_data.columns if col in result_dataset.columns and col not in common_keys ] - if cols_to_drop: - print(f"Dropping already-present columns: {cols_to_drop}") group_data = group_data.drop(columns=cols_to_drop) - - # Type conversion result_dataset[idvar_value] = result_dataset[idvar_value].astype(str) group_data[idvar_value] = group_data[idvar_value].astype(str) - # Merge (exactly like single IDVAR case) result_dataset = PandasDataset( pd.merge( result_dataset.data, @@ -506,36 +483,24 @@ def _merge_supp_with_multiple_idvars( suffixes=("", "_supp"), ) ) - - # Validation logic for qnam in qnam_list: if qnam not in result_dataset.columns: continue - qnam_check = result_dataset.data.dropna(subset=[qnam]) if len(qnam_check) == 0: continue - - # Find the merge keys for this QNAM idvar_for_qnam = right_dataset.data[right_dataset.data["QNAM"] == qnam][ "IDVAR" ].iloc[0] - validation_keys = [ key for key in static_keys if key in result_dataset.columns ] validation_keys.append(idvar_for_qnam) - - print(f"\nValidating QNAM '{qnam}' with keys: {validation_keys}") grouped = qnam_check.groupby(validation_keys).size() - if (grouped > 1).any(): - print(f"DUPLICATES FOUND for {qnam}:") - print(grouped[grouped > 1]) raise ValueError( f"Multiple records with the same QNAM '{qnam}' match a single parent record" ) - return result_dataset @staticmethod From a0f3ada50921e90b3d4f13a4389a2fe4cb266ffb Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Fri, 14 Nov 2025 15:38:54 -0500 Subject: [PATCH 5/9] alphabetize requirement.txt --- requirements.txt | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/requirements.txt b/requirements.txt index 947ee21e7..4e130d38f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,25 +1,25 @@ business_rules_enhanced==1.4.8 +cachetools==6.1.0 cdisc-library-client==0.1.6 click==8.1.7 +dask[dataframe]==2024.6.0 +dask[array]==2024.6.0 +fastparquet==2024.2.0 importlib-metadata==8.5.0 jsonata-python==0.6.0 jsonpath-ng==1.6.1 jsonschema==4.18.5 +lxml==5.2.1 numpy~=1.26.0 odmlib==0.1.4 openpyxl==3.1.5 pandas==2.1.4 +psutil==6.1.1 pyinstaller==6.11.0 +Pympler==1.1 +pyreadstat==1.2.7 python-dotenv==1.0.0 pyyaml==6.0.2 redis==4.5.0 requests~=2.32.3 setuptools~=75.6.0 -cachetools==6.1.0 -Pympler==1.1 -psutil==6.1.1 -dask[dataframe]==2024.6.0 -dask[array]==2024.6.0 -pyreadstat==1.2.7 -fastparquet==2024.2.0 -lxml==5.2.1 \ No newline at end of file From dc1a77f60e27accbd18a3f59d3a09710743aaa72 Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Mon, 17 Nov 2025 13:01:04 -0500 Subject: [PATCH 6/9] removed is_relationship flag --- cdisc_rules_engine/models/rule.py | 8 - .../utilities/data_processor.py | 259 ++---------------- .../utilities/dataset_preprocessor.py | 14 +- tests/conftest.py | 8 +- .../CoreIssue747/Rule_underscores.json | 205 +++++++------- tests/unit/test_rule.py | 6 +- tests/unit/test_rules_engine.py | 9 +- 7 files changed, 128 insertions(+), 381 deletions(-) diff --git a/cdisc_rules_engine/models/rule.py b/cdisc_rules_engine/models/rule.py index 7a9fa3f6f..090af8d5a 100644 --- a/cdisc_rules_engine/models/rule.py +++ b/cdisc_rules_engine/models/rule.py @@ -184,12 +184,6 @@ def parse_actions(cls, actions_data: dict) -> List[dict]: @classmethod def parse_datasets(cls, match_key_data: List[dict]) -> List[dict]: - # Defaulting to IDVAR and IDVARVAL as relationship columns. - # May change in the future as more standard rules are written. - relationship_columns = { - "column_with_names": "IDVAR", - "column_with_values": "IDVARVAL", - } if not match_key_data: return None datasets = [] @@ -206,8 +200,6 @@ def parse_datasets(cls, match_key_data: List[dict]) -> List[dict]: ], "wildcard": data.get("Wildcard", "**"), } - if data.get("Is_Relationship", False): - join_data["relationship_columns"] = relationship_columns if "Join_Type" in data: join_data["join_type"] = data.get("Join_Type") if "Child" in data: diff --git a/cdisc_rules_engine/utilities/data_processor.py b/cdisc_rules_engine/utilities/data_processor.py index c5cb67573..463117269 100644 --- a/cdisc_rules_engine/utilities/data_processor.py +++ b/cdisc_rules_engine/utilities/data_processor.py @@ -44,162 +44,6 @@ def convert_float_merge_keys(series: pd.Series) -> pd.Series: """ return series.astype(float, errors="ignore").astype(str) - @staticmethod - def filter_dataset_by_match_keys_of_other_dataset( - dataset: DatasetInterface, - dataset_match_keys: List[str], - other_dataset: DatasetInterface, - other_dataset_match_keys: List[str], - ) -> DatasetInterface: - """ - Returns a DataFrame where values of match keys of - dataset are equal to values of match keys of other dataset. - Example: - dataset = USUBJID DOMAIN - CDISC001 AE - CDISC001 AE - CDISC009 AE - dataset_match_keys = ["USUBJID"] - other_dataset = USUBJID DOMAIN - CDISC009 AE - other_dataset_match_keys = ["USUBJID"] - - The result will be: USUBJID DOMAIN - CDISC009 AE - """ - dataset_w_ind = dataset.set_index(dataset_match_keys) - other_dataset_w_ind = other_dataset.set_index(other_dataset_match_keys) - condition = dataset_w_ind.index.isin(other_dataset_w_ind.index) - return dataset_w_ind[condition].reset_index() - - @staticmethod - def filter_parent_dataset_by_supp_dataset( - parent_dataset: DatasetInterface, - supp_dataset: DatasetInterface, - column_with_names: str, - column_with_values: str, - ) -> DatasetInterface: - """ - A wrapper function for convenient filtering of parent dataset by supp dataset. - Does two things: - 1. Filters parent dataset by RDOMAIN column of supp dataset. - 2. Filters parent dataset by columns of supp dataset - that describe their relation. - """ - parent_dataset = DataProcessor.filter_parent_dataset_by_supp_dataset_rdomain( - parent_dataset, supp_dataset - ) - return DataProcessor.filter_dataset_by_nested_columns_of_other_dataset( - parent_dataset, supp_dataset, column_with_names, column_with_values - ) - - @staticmethod - def filter_parent_dataset_by_supp_dataset_rdomain( - parent_dataset: DatasetInterface, supp_dataset: DatasetInterface - ) -> DatasetInterface: - """ - Leaves only those rows in parent dataset - where DOMAIN is the same as RDOMAIN of supp dataset. - """ - parent_domain_values: pd.Series = parent_dataset.get("DOMAIN", pd.Series()) - supp_domain_values: pd.Series = supp_dataset.get("RDOMAIN", pd.Series()) - if parent_domain_values.empty or supp_domain_values.empty: - return parent_dataset - - return parent_dataset[parent_domain_values.isin(supp_domain_values)] - - @staticmethod - def filter_dataset_by_nested_columns_of_other_dataset( - dataset: DatasetInterface, - other_dataset: DatasetInterface, - column_with_names: str, - column_with_values: str, - ) -> DatasetInterface: - """ - other_dataset has two columns: - 1. list of column names which exist in dataset - column_with_names. - 2. list of values of the aforementioned columns - column_with_values. - - The function filters columns name of dataset by their respective column values. - - Example: - other_dataset = IDVAR IDVARVAL - ECSEQ 100 - ECSEQ 101 - ECNUM 105 - column_with_names = "IDVAR" - column_with_values = "IDVARVAL" - - We need to leave only those rows in dataset - where dataset["ECSEQ"] is equal to 100 or 101 - AND dataset["ECNUM"] is equal to 105. - """ - if ( - other_dataset[column_with_names].str.strip().eq("").all() - and other_dataset[column_with_values].str.strip().eq("").all() - ): - return dataset - grouped = other_dataset.groupby(column_with_names, group_keys=False) - - def filter_dataset_by_group_values(group) -> DatasetInterface: - decimal_group_values: pd.Series = DataProcessor.convert_float_merge_keys( - group[column_with_values] - ) - decimal_dataset_values: pd.Series = DataProcessor.convert_float_merge_keys( - dataset[group.name] - ) - condition: pd.Series = decimal_dataset_values.isin(decimal_group_values) - return dataset[condition] - - result = grouped.apply(lambda group: filter_dataset_by_group_values(group)) - # grouping breaks sorting, need to sort once again - return dataset.__class__(result.sort_values(list(grouped.groups.keys()))) - - @staticmethod - def merge_datasets_on_relationship_columns( - left_dataset: DatasetInterface, - left_dataset_match_keys: List[str], - right_dataset: DatasetInterface, - right_dataset_match_keys: List[str], - right_dataset_domain_name: str, - column_with_names: str, - column_with_values: str, - ) -> DatasetInterface: - """ - Uses full join to merge given datasets on the - columns that describe their relation. - """ - # right dataset holds column names of left dataset. - # all values in the column are the same - if ( - right_dataset[column_with_names].str.strip().eq("").all() - and right_dataset[column_with_values].str.strip().eq("").all() - ): - return left_dataset.merge( - other=right_dataset.data, - left_on=left_dataset_match_keys, - right_on=right_dataset_match_keys, - how="outer", - suffixes=("", f".{right_dataset_domain_name}"), - ) - left_ds_col_name: str = right_dataset[column_with_names][0] - - # convert numeric columns to one data type to avoid merging errors - # there is no point in converting string cols since their data type is the same - DataProcessor.cast_numeric_cols_to_same_data_type( - right_dataset, column_with_values, left_dataset, left_ds_col_name - ) - left_dataset_match_keys.append(left_ds_col_name) - right_dataset_match_keys.append(column_with_values) - - return left_dataset.merge( - other=right_dataset.data, - left_on=left_dataset_match_keys, - right_on=right_dataset_match_keys, - how="outer", - suffixes=("", f".{right_dataset_domain_name}"), - ) - @staticmethod def filter_if_present(df: DatasetInterface, col: str, filter_value): """ @@ -348,38 +192,6 @@ def merge_relrec_datasets( result = objs[0].concat(objs[1:], ignore_index=True) return result - @staticmethod - def merge_relationship_datasets( - left_dataset: DatasetInterface, - left_dataset_match_keys: List[str], - right_dataset: DatasetInterface, - right_dataset_match_keys: List[str], - right_dataset_domain: dict, - ) -> DatasetInterface: - result = DataProcessor.filter_dataset_by_match_keys_of_other_dataset( - left_dataset, - left_dataset_match_keys, - right_dataset, - right_dataset_match_keys, - ) - result = DataProcessor.filter_parent_dataset_by_supp_dataset( - result, - right_dataset, - **right_dataset_domain["relationship_columns"], - ) - - # convert result back to DatasetInterface class - result = left_dataset.__class__(result.reset_index(drop=True)) - result = DataProcessor.merge_datasets_on_relationship_columns( - left_dataset=result, - left_dataset_match_keys=left_dataset_match_keys, - right_dataset=right_dataset, - right_dataset_match_keys=right_dataset_match_keys, - right_dataset_domain_name=right_dataset_domain.get("domain"), - **right_dataset_domain["relationship_columns"], - ) - return result - @staticmethod def merge_pivot_supp_dataset( dataset_implementation: DatasetInterface, @@ -393,17 +205,24 @@ def merge_pivot_supp_dataset( if len(unique_idvar_values) == 1: right_dataset = DataProcessor.process_supp(right_dataset) dynamic_key = right_dataset["IDVAR"].iloc[0] + is_blank = pd.isna(dynamic_key) or str(dynamic_key).strip() == "" # Determine the common keys present in both datasets common_keys = [ key for key in static_keys if key in left_dataset.columns and key in right_dataset.columns ] - common_keys.append(dynamic_key) - current_supp = right_dataset.rename(columns={"IDVARVAL": dynamic_key}) - current_supp = current_supp.drop(columns=["IDVAR"]) - left_dataset[dynamic_key] = left_dataset[dynamic_key].astype(str) - current_supp[dynamic_key] = current_supp[dynamic_key].astype(str) + if not is_blank: + common_keys.append(dynamic_key) + current_supp = right_dataset.rename(columns={"IDVARVAL": dynamic_key}) + current_supp = current_supp.drop(columns=["IDVAR"]) + left_dataset[dynamic_key] = left_dataset[dynamic_key].astype(str) + current_supp[dynamic_key] = current_supp[dynamic_key].astype(str) + else: + columns_to_drop = [ + col for col in ["IDVAR", "IDVARVAL"] if col in right_dataset.columns + ] + current_supp = right_dataset.drop(columns=columns_to_drop) left_dataset = PandasDataset( pd.merge( left_dataset.data, @@ -512,7 +331,11 @@ def process_supp(supp_dataset): # Set the value of the new columns only in their respective rows for index, row in supp_dataset.iterrows(): supp_dataset.at[index, row["QNAM"]] = row["QVAL"] - supp_dataset = supp_dataset.drop(labels=["QNAM", "QVAL", "QLABEL"], axis=1) + columns_to_drop = [ + col for col in ["QNAM", "QVAL", "QLABEL"] if col in supp_dataset.columns + ] + if columns_to_drop: + supp_dataset = supp_dataset.drop(labels=columns_to_drop, axis=1) return supp_dataset @staticmethod @@ -551,54 +374,6 @@ def merge_sdtm_datasets( ] = None return result - @staticmethod - def cast_numeric_cols_to_same_data_type( - left_dataset: DatasetInterface, - left_dataset_column: str, - right_dataset: DatasetInterface, - right_dataset_column: str, - ): - """ - Casts given columns to one data type (float) - ONLY if they are numeric. - Before casting, the method ensures that both of the - columns hold numeric values and performs conversion - only if both of them are. - - Example 1: - left_dataset[left_dataset_column] = [1, 2, 3, 4, ] - right_dataset[right_dataset_column] = ["1.0", "2.0", "3.0", "4.0", ] - - Example 2: - left_dataset[left_dataset_column] = ["1", "2", "3", "4", ] - right_dataset[right_dataset_column] = ["1.0", "2.0", "3.0", "4.0", ] - - Example 3: - left_dataset[left_dataset_column] = ["1", "2", "3", "4", ] - right_dataset[right_dataset_column] = [1, 2, 3, 4, ] - """ - # check if both columns are numeric - left_is_numeric: bool = DataProcessor.column_contains_numeric( - left_dataset[left_dataset_column] - ) - right_is_numeric: bool = DataProcessor.column_contains_numeric( - right_dataset[right_dataset_column] - ) - if left_is_numeric and right_is_numeric: - # convert to float - right_dataset[right_dataset_column] = right_dataset[ - right_dataset_column - ].astype(float) - left_dataset[left_dataset_column] = left_dataset[ - left_dataset_column - ].astype(float) - - @staticmethod - def column_contains_numeric(column: pd.Series) -> bool: - if not pd.api.types.is_numeric_dtype(column): - return column.str.replace(".", "").str.isdigit().all() - return True - @staticmethod def filter_dataset_columns_by_metadata_and_rule( columns: List[str], diff --git a/cdisc_rules_engine/utilities/dataset_preprocessor.py b/cdisc_rules_engine/utilities/dataset_preprocessor.py index b054a9d72..a86437d69 100644 --- a/cdisc_rules_engine/utilities/dataset_preprocessor.py +++ b/cdisc_rules_engine/utilities/dataset_preprocessor.py @@ -495,20 +495,14 @@ def _merge_datasets( dataset_preprocessor=self, wildcard=right_dataset_domain_details.get("wildcard"), ) - elif right_dataset_domain_name.startswith("SUPP"): - result: DatasetInterface = DataProcessor.merge_pivot_supp_dataset( + elif right_dataset_domain_name.startswith( + "SUPP" + ) or right_dataset_domain_name.startswith("SQ"): + result = DataProcessor.merge_pivot_supp_dataset( dataset_implementation=self._data_service.dataset_implementation, left_dataset=left_dataset, right_dataset=right_dataset, ) - elif self._rule_processor.is_relationship_dataset(right_dataset_domain_name): - result: DatasetInterface = DataProcessor.merge_relationship_datasets( - left_dataset=left_dataset, - left_dataset_match_keys=left_dataset_match_keys, - right_dataset=right_dataset, - right_dataset_match_keys=right_dataset_match_keys, - right_dataset_domain=right_dataset_domain_details, - ) else: result: DatasetInterface = DataProcessor.merge_sdtm_datasets( left_dataset=left_dataset, diff --git a/tests/conftest.py b/tests/conftest.py index 2b79308b3..b0bed96db 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -876,10 +876,6 @@ def dataset_rule_record_in_parent_domain_equal_to() -> dict: { "domain_name": "SUPPEC", "match_key": ["USUBJID"], - "relationship_columns": { - "column_with_names": "IDVAR", - "column_with_values": "IDVARVAL", - }, } ], "conditions": ConditionCompositeFactory.get_condition_composite( @@ -888,7 +884,7 @@ def dataset_rule_record_in_parent_domain_equal_to() -> dict: { "name": "get_dataset", "operator": "equal_to", - "value": {"target": "QNAM", "comparator": "ECREASOC"}, + "value": {"target": "ECREASOC", "comparator": "Some Value 1"}, }, { "name": "get_dataset", @@ -905,7 +901,7 @@ def dataset_rule_record_in_parent_domain_equal_to() -> dict: } ], "output_variables": [ - "QNAM", + "ECREASOC", "ECPRESP", ], } diff --git a/tests/resources/CoreIssue747/Rule_underscores.json b/tests/resources/CoreIssue747/Rule_underscores.json index 709018a6b..ccaab488b 100755 --- a/tests/resources/CoreIssue747/Rule_underscores.json +++ b/tests/resources/CoreIssue747/Rule_underscores.json @@ -1,118 +1,111 @@ { - "Authorities": [ + "Authorities": [ + { + "Organization": "CDISC", + "Standards": [ { - "Organization": "CDISC", - "Standards": [ - { - "Name": "SDTMIG", - "References": [ - { - "Citations": [ - { - "Document": "IG v3.4", - "Section": "Table 3.2.1", - "Cited_Guidance": "Note that the key variables shown in this table are examples only. A sponsor's actual key structure may be different." - }, - { - "Document": "IG v3.4", - "Section": "3.2.1.1", - "Cited_Guidance": "Since the purpose of this column is to aid reviewers in understanding the structure of a dataset, sponsors should list all of the natural keys (see definition below) for the dataset. These keys should define uniqueness for records within a dataset, and may define a record sort order. The identified keys for each dataset should be consistent with the description of the dataset structure as described in the Define-XML document." - } - ], - "Origin": "SDTM and SDTMIG Conformance Rules", - "Version": "2.0", - "Rule_Identifier": { - "Id": "CG0019", - "Version": "1" - } - } - ], - "Version": "3.4" - }, + "Name": "SDTMIG", + "References": [ + { + "Citations": [ { - "Name": "SDTMIG", - "References": [ - { - "Citations": [ - { - "Document": "IG v3.2", - "Section": "Table 3.2.1|3.2.1.1", - "Cited_Guidance": "Table 3.2.1[Note that the key variables shown in this table are examples only. A sponsor's actual key structure may be different.]|3.2.1.1[Since the purpose of this column is to aid reviewers in understanding the structure of a dataset, sponsors should list all of the natural keys (see definition below) for the dataset. These keys should define uniqueness for records within a dataset, and may define a record sort order.]" - } - ], - "Origin": "SDTM and SDTMIG Conformance Rules", - "Version": "2.0", - "Rule_Identifier": { - "Id": "CG0019", - "Version": "1" - } - } - ], - "Version": "3.2" + "Document": "IG v3.4", + "Section": "Table 3.2.1", + "Cited_Guidance": "Note that the key variables shown in this table are examples only. A sponsor's actual key structure may be different." }, { - "Name": "SDTMIG", - "References": [ - { - "Citations": [ - { - "Document": "IG v3.3", - "Section": "Table 3.2.1|3.2.1.1", - "Cited_Guidance": "Table 3.2.1[Note that the key variables shown in this table are examples only. A sponsor's actual key structure may be different.]||3.2.1.1[Since the purpose of this column is to aid reviewers in understanding the structure of a dataset, sponsors should list all of the natural keys (see definition below) for the dataset. These keys should define uniqueness for records within a dataset, and may define a record sort order.]" - } - ], - "Origin": "SDTM and SDTMIG Conformance Rules", - "Version": "2.0", - "Rule_Identifier": { - "Id": "CG0019", - "Version": "1" - } - } - ], - "Version": "3.3" + "Document": "IG v3.4", + "Section": "3.2.1.1", + "Cited_Guidance": "Since the purpose of this column is to aid reviewers in understanding the structure of a dataset, sponsors should list all of the natural keys (see definition below) for the dataset. These keys should define uniqueness for records within a dataset, and may define a record sort order. The identified keys for each dataset should be consistent with the description of the dataset structure as described in the Define-XML document." } - ] - } - ], - "Check": { - "all": [ + ], + "Origin": "SDTM and SDTMIG Conformance Rules", + "Version": "2.0", + "Rule_Identifier": { + "Id": "CG0019", + "Version": "1" + } + } + ], + "Version": "3.4" + }, + { + "Name": "SDTMIG", + "References": [ { - "name": "define_dataset_key_sequence", - "operator": "is_not_unique_set" + "Citations": [ + { + "Document": "IG v3.2", + "Section": "Table 3.2.1|3.2.1.1", + "Cited_Guidance": "Table 3.2.1[Note that the key variables shown in this table are examples only. A sponsor's actual key structure may be different.]|3.2.1.1[Since the purpose of this column is to aid reviewers in understanding the structure of a dataset, sponsors should list all of the natural keys (see definition below) for the dataset. These keys should define uniqueness for records within a dataset, and may define a record sort order.]" + } + ], + "Origin": "SDTM and SDTMIG Conformance Rules", + "Version": "2.0", + "Rule_Identifier": { + "Id": "CG0019", + "Version": "1" + } } - ] - }, - "Core": { - "Id": "CDISC.SDTMIG.CG0019", - "Status": "Draft", - "Version": "1" - }, - "Description": "Trigger error if records are not unique as per sponsor defined key variables as documented in the define.xml", - "Executability": "Fully Executable", - "Outcome": { - "Message": "Records are not unique as per sponsor defined key variables as documented in the define.xml" - }, - "Scope": { - "Classes": { - "Include": [ - "ALL" - ] + ], + "Version": "3.2" }, - "Domains": { - "Include": [ - "ALL" - ] - } - }, - "Sensitivity": "Record", - "Match_Datasets": [ { - "Keys": [ - "USUBJID" - ], - "Name": "SUPP--", - "Is_Relationship": true + "Name": "SDTMIG", + "References": [ + { + "Citations": [ + { + "Document": "IG v3.3", + "Section": "Table 3.2.1|3.2.1.1", + "Cited_Guidance": "Table 3.2.1[Note that the key variables shown in this table are examples only. A sponsor's actual key structure may be different.]||3.2.1.1[Since the purpose of this column is to aid reviewers in understanding the structure of a dataset, sponsors should list all of the natural keys (see definition below) for the dataset. These keys should define uniqueness for records within a dataset, and may define a record sort order.]" + } + ], + "Origin": "SDTM and SDTMIG Conformance Rules", + "Version": "2.0", + "Rule_Identifier": { + "Id": "CG0019", + "Version": "1" + } + } + ], + "Version": "3.3" } - ], - "Rule_Type": "Dataset Contents Check against Define XML" + ] + } + ], + "Check": { + "all": [ + { + "name": "define_dataset_key_sequence", + "operator": "is_not_unique_set" + } + ] + }, + "Core": { + "Id": "CDISC.SDTMIG.CG0019", + "Status": "Draft", + "Version": "1" + }, + "Description": "Trigger error if records are not unique as per sponsor defined key variables as documented in the define.xml", + "Executability": "Fully Executable", + "Outcome": { + "Message": "Records are not unique as per sponsor defined key variables as documented in the define.xml" + }, + "Scope": { + "Classes": { + "Include": ["ALL"] + }, + "Domains": { + "Include": ["ALL"] + } + }, + "Sensitivity": "Record", + "Match_Datasets": [ + { + "Keys": ["USUBJID"], + "Name": "SUPP--" + } + ], + "Rule_Type": "Dataset Contents Check against Define XML" } diff --git a/tests/unit/test_rule.py b/tests/unit/test_rule.py index 48285a678..51244bcfc 100644 --- a/tests/unit/test_rule.py +++ b/tests/unit/test_rule.py @@ -86,15 +86,11 @@ def test_valid_parse_actions(): [{"domain_name": "AA", "match_key": ["USUBJID"], "wildcard": "**"}], ), ( - [{"Name": "SUPPEC", "Keys": ["USUBJID"], "Is_Relationship": True}], + [{"Name": "SUPPEC", "Keys": ["USUBJID"]}], [ { "domain_name": "SUPPEC", "match_key": ["USUBJID"], - "relationship_columns": { - "column_with_names": "IDVAR", - "column_with_values": "IDVARVAL", - }, "wildcard": "**", } ], diff --git a/tests/unit/test_rules_engine.py b/tests/unit/test_rules_engine.py index 24183c0ff..7e8bf872e 100644 --- a/tests/unit/test_rules_engine.py +++ b/tests/unit/test_rules_engine.py @@ -1971,13 +1971,14 @@ def test_validate_record_in_parent_domain( "ECREASOC", "ECREASOS", ], + "QVAL": ["Some Value 1", "Some Value 2"], "IDVAR": [ "ECSEQ", "ECSEQ", ], "IDVARVAL": [ - "4.0", - "5.0", + "4", + "5", ], } ) @@ -2017,13 +2018,13 @@ def test_validate_record_in_parent_domain( "executionStatus": "success", "domain": "EC", "dataset": "ec.xpt", - "variables": ["ECPRESP", "QNAM"], + "variables": ["ECPRESP", "ECREASOC"], "message": "Dataset contents is wrong.", "errors": [ { "dataset": "ec.xpt", "row": 4, - "value": {"ECPRESP": "Y", "QNAM": "ECREASOC"}, + "value": {"ECPRESP": "Y", "ECREASOC": "Some Value 1"}, "USUBJID": "CDISC005", "SEQ": 4, } From e2b12f123ee56ef85d13c5548e29fa599137b89c Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Mon, 17 Nov 2025 13:24:38 -0500 Subject: [PATCH 7/9] tests --- tests/unit/test_dataset_preprocessor.py | 51 ++-- .../test_utilities/test_data_processor.py | 255 +----------------- 2 files changed, 24 insertions(+), 282 deletions(-) diff --git a/tests/unit/test_dataset_preprocessor.py b/tests/unit/test_dataset_preprocessor.py index ab5c456da..319038eff 100644 --- a/tests/unit/test_dataset_preprocessor.py +++ b/tests/unit/test_dataset_preprocessor.py @@ -972,7 +972,7 @@ def test_preprocess_with_merge_comparison( def test_preprocess_supp_with_blank_idvar_idvarval(mock_get_dataset): """ Test preprocessing when SUPP dataset has blank IDVAR and IDVARVAL. - Should merge successfully without attempting float conversion. + Should pivot and merge on static keys. """ main_data = { "USUBJID": ["CDISC001", "CDISC002"], @@ -984,8 +984,8 @@ def test_preprocess_supp_with_blank_idvar_idvarval(mock_get_dataset): supp_data = { "USUBJID": ["CDISC001", "CDISC002"], "RDOMAIN": ["AE", "AE"], - "IDVAR": ["", ""], # Blank IDVAR - "IDVARVAL": ["", ""], # Blank IDVARVAL + "IDVAR": ["", ""], + "IDVARVAL": ["", ""], "QNAM": ["AESPID", "AESPID"], "QVAL": ["SCREENING", "BASELINE"], } @@ -1005,10 +1005,6 @@ def test_preprocess_supp_with_blank_idvar_idvarval(mock_get_dataset): { "domain_name": "SUPPAE", "match_key": ["USUBJID"], - "relationship_columns": { - "column_with_names": "IDVAR", - "column_with_values": "IDVARVAL", - }, } ], "conditions": ConditionCompositeFactory.get_condition_composite( @@ -1017,7 +1013,7 @@ def test_preprocess_supp_with_blank_idvar_idvarval(mock_get_dataset): { "name": "get_dataset", "operator": "equal_to", - "value": {"target": "QVAL", "comparator": "SCREENING"}, + "value": {"target": "AESPID", "comparator": "SCREENING"}, } ] } @@ -1030,9 +1026,17 @@ def test_preprocess_supp_with_blank_idvar_idvarval(mock_get_dataset): ] result = preprocessor.preprocess(rule, datasets) assert len(result.data) == 2 - assert "QNAM" in result.data.columns - assert "QVAL" in result.data.columns - assert all(qnam == "AESPID" for qnam in result.data["QNAM"]) + assert "AESPID" in result.data.columns + assert "QNAM" not in result.data.columns + assert "QVAL" not in result.data.columns + assert ( + result.data[result.data["USUBJID"] == "CDISC001"]["AESPID"].values[0] + == "SCREENING" + ) + assert ( + result.data[result.data["USUBJID"] == "CDISC002"]["AESPID"].values[0] + == "BASELINE" + ) @patch("cdisc_rules_engine.services.data_services.LocalDataService.get_dataset") @@ -1155,10 +1159,6 @@ def test_preprocess_specific_suppae_dataset( { "domain_name": "SUPPAE", "match_key": ["USUBJID"], - "relationship_columns": { - "column_with_names": "IDVAR", - "column_with_values": "IDVARVAL", - }, } ], "conditions": ConditionCompositeFactory.get_condition_composite( @@ -1167,7 +1167,7 @@ def test_preprocess_specific_suppae_dataset( { "name": "get_dataset", "operator": "equal_to", - "value": {"target": "QVAL", "comparator": "SP001"}, + "value": {"target": "AESPID", "comparator": "SP001"}, } ] } @@ -1184,20 +1184,15 @@ def test_preprocess_specific_suppae_dataset( data_service = LocalDataService(MagicMock(), MagicMock(), MagicMock()) preprocessor = DatasetPreprocessor( ae_dataset, - SDTMDatasetMetadata( - first_record={"DOMAIN": "AE"}, full_path=os.path.join("path", "ae.xpt") - ), + SDTMDatasetMetadata(first_record={"DOMAIN": "AE"}, full_path="path"), data_service, InMemoryCacheService(), ) result = preprocessor.preprocess(rule_with_specific_supp, datasets) - assert len(result.data) >= 1 - assert "USUBJID" in result.data.columns - assert "AETERM" in result.data.columns - assert "QNAM" in result.data.columns - assert "QVAL" in result.data.columns - qval_values = result.data["QVAL"].dropna().tolist() - assert "SP001" in qval_values - expected_path = os.path.join("path", "suppae.xpt") - mock_get_dataset.assert_called_with(dataset_name=expected_path) + + assert len(result.data) == 1 + assert "AESPID" in result.data.columns + assert result.data["AESPID"].values[0] == "SP001" + assert "QNAM" not in result.data.columns + assert "QVAL" not in result.data.columns diff --git a/tests/unit/test_utilities/test_data_processor.py b/tests/unit/test_utilities/test_data_processor.py index bee22056d..48bb805d2 100644 --- a/tests/unit/test_utilities/test_data_processor.py +++ b/tests/unit/test_utilities/test_data_processor.py @@ -2,7 +2,7 @@ import pandas as pd import pytest from cdisc_rules_engine.utilities.data_processor import DataProcessor -from cdisc_rules_engine.models.dataset import PandasDataset, DaskDataset +from cdisc_rules_engine.models.dataset import PandasDataset from cdisc_rules_engine.enums.join_types import JoinTypes @@ -53,259 +53,6 @@ def test_filter_dataset_columns_by_metadata_and_rule(): ] -@pytest.mark.parametrize("dataset_implementation", [PandasDataset, DaskDataset]) -def test_merge_datasets_on_relationship_columns(dataset_implementation): - """ - Unit test for DataProcessor.merge_datasets_on_relationship_columns method. - """ - # prepare data - left_dataset = dataset_implementation.from_dict( - { - "USUBJID": [ - "CDISC01", - "CDISC01", - "CDISC01", - ], - "DOMAIN": [ - "AE", - "AE", - "AE", - ], - "AESEQ": [ - 1, - 2, - 3, - ], - } - ) - right_dataset = dataset_implementation.from_dict( - { - "USUBJID": [ - "CDISC01", - "CDISC01", - "CDISC01", - "CDISC01", - ], - "RDOMAIN": [ - "AE", - "AE", - "AE", - "AE", - ], - "QNAM": [ - "TEST", - "TEST", - "TEST", - "TEST_1", - ], - "IDVAR": [ - "AESEQ", - "AESEQ", - "AESEQ", - "AESEQ", - ], - "IDVARVAL": [ - "1.0", - "2", - "3.0", - "3.0", - ], - } - ) - - # call the tested function and check the results - merged_df = DataProcessor.merge_datasets_on_relationship_columns( - left_dataset=left_dataset, - left_dataset_match_keys=[], - right_dataset=right_dataset, - right_dataset_match_keys=[], - right_dataset_domain_name="SUPPAE", - column_with_names="IDVAR", - column_with_values="IDVARVAL", - ) - merged_df.data = merged_df.data.sort_values("AESEQ") - expected_df = dataset_implementation.from_dict( - { - "USUBJID": [ - "CDISC01", - "CDISC01", - "CDISC01", - "CDISC01", - ], - "DOMAIN": [ - "AE", - "AE", - "AE", - "AE", - ], - "AESEQ": [ - 1.0, - 2.0, - 3.0, - 3.0, - ], - "USUBJID.SUPPAE": [ - "CDISC01", - "CDISC01", - "CDISC01", - "CDISC01", - ], - "RDOMAIN": [ - "AE", - "AE", - "AE", - "AE", - ], - "QNAM": [ - "TEST", - "TEST", - "TEST", - "TEST_1", - ], - "IDVAR": [ - "AESEQ", - "AESEQ", - "AESEQ", - "AESEQ", - ], - "IDVARVAL": [ - 1.0, - 2.0, - 3.0, - 3.0, - ], - } - ) - assert merged_df.equals(expected_df) - - -@pytest.mark.parametrize("dataset_implementation", [PandasDataset]) -def test_merge_datasets_on_string_relationship_columns(dataset_implementation): - """ - Unit test for DataProcessor.merge_datasets_on_relationship_columns method. - Test the case when the columns that describe the relation - are of a string type. - """ - # prepare data - left_dataset = dataset_implementation.from_dict( - { - "USUBJID": [ - "CDISC01", - "CDISC01", - "CDISC01", - ], - "DOMAIN": [ - "AE", - "AE", - "AE", - ], - "AESEQ": [ - "CDISC_IA", - "CDISC_IB", - "CDISC_IC", - ], - } - ) - right_dataset = dataset_implementation.from_dict( - { - "USUBJID": [ - "CDISC01", - "CDISC01", - "CDISC01", - "CDISC01", - ], - "RDOMAIN": [ - "AE", - "AE", - "AE", - "AE", - ], - "QNAM": [ - "TEST", - "TEST", - "TEST", - "TEST_1", - ], - "IDVAR": [ - "AESEQ", - "AESEQ", - "AESEQ", - "AESEQ", - ], - "IDVARVAL": [ - "CDISC_IA", - "CDISC_IB", - "CDISC_IC", - "CDISC_IC", - ], - } - ) - - # call the tested function and check the results - merged_df = DataProcessor.merge_datasets_on_relationship_columns( - left_dataset=left_dataset, - left_dataset_match_keys=[], - right_dataset=right_dataset, - right_dataset_match_keys=[], - right_dataset_domain_name="SUPPAE", - column_with_names="IDVAR", - column_with_values="IDVARVAL", - ) - expected_df = dataset_implementation.from_dict( - { - "USUBJID": [ - "CDISC01", - "CDISC01", - "CDISC01", - "CDISC01", - ], - "DOMAIN": [ - "AE", - "AE", - "AE", - "AE", - ], - "AESEQ": [ - "CDISC_IA", - "CDISC_IB", - "CDISC_IC", - "CDISC_IC", - ], - "USUBJID.SUPPAE": [ - "CDISC01", - "CDISC01", - "CDISC01", - "CDISC01", - ], - "RDOMAIN": [ - "AE", - "AE", - "AE", - "AE", - ], - "QNAM": [ - "TEST", - "TEST", - "TEST", - "TEST_1", - ], - "IDVAR": [ - "AESEQ", - "AESEQ", - "AESEQ", - "AESEQ", - ], - "IDVARVAL": [ - "CDISC_IA", - "CDISC_IB", - "CDISC_IC", - "CDISC_IC", - ], - } - ) - assert merged_df.equals(expected_df) - - @pytest.mark.parametrize( "join_type, expected_df", [ From 79e33eb057d466e8748c81d419b6fb5df877bfda Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Mon, 17 Nov 2025 16:14:17 -0500 Subject: [PATCH 8/9] dask --- .../utilities/data_processor.py | 71 +++++++-- .../test_utilities/test_data_processor.py | 137 +++++++++++++++++- 2 files changed, 191 insertions(+), 17 deletions(-) diff --git a/cdisc_rules_engine/utilities/data_processor.py b/cdisc_rules_engine/utilities/data_processor.py index 463117269..30465e4a2 100644 --- a/cdisc_rules_engine/utilities/data_processor.py +++ b/cdisc_rules_engine/utilities/data_processor.py @@ -223,27 +223,40 @@ def merge_pivot_supp_dataset( col for col in ["IDVAR", "IDVARVAL"] if col in right_dataset.columns ] current_supp = right_dataset.drop(columns=columns_to_drop) - left_dataset = PandasDataset( - pd.merge( - left_dataset.data, - current_supp.data, - how="left", - on=common_keys, - suffixes=("", "_supp"), + if isinstance(left_dataset, DaskDataset): + left_pandas = PandasDataset(left_dataset.data.compute()) + merged_pandas = PandasDataset( + pd.merge( + left_pandas.data, + current_supp.data, + how="left", + on=common_keys, + suffixes=("", "_supp"), + ) ) - ) - for qnam in qnam_list: - qnam_check = left_dataset.data.dropna(subset=[qnam]) - grouped = qnam_check.groupby(common_keys).size() - if (grouped > 1).any(): - raise ValueError( - f"Multiple records with the same QNAM '{qnam}' match a single parent record" + DataProcessor._validate_qnam(merged_pandas.data, qnam_list, common_keys) + left_dataset = DaskDataset(merged_pandas.data) + else: + left_dataset = PandasDataset( + pd.merge( + left_dataset.data, + current_supp.data, + how="left", + on=common_keys, + suffixes=("", "_supp"), ) + ) + DataProcessor._validate_qnam(left_dataset.data, qnam_list, common_keys) else: + if dataset_implementation == DaskDataset: + left_dataset = PandasDataset(left_dataset.data.compute()) + right_dataset = PandasDataset(right_dataset.data.compute()) left_dataset = DataProcessor._merge_supp_with_multiple_idvars( left_dataset, right_dataset, static_keys, qnam_list ) - if dataset_implementation == DaskDataset: + if dataset_implementation == DaskDataset and not isinstance( + left_dataset, DaskDataset + ): left_dataset = DaskDataset(left_dataset.data) return left_dataset @@ -266,7 +279,12 @@ def _merge_supp_with_multiple_idvars( group_data[qnam] = pd.NA for index, row in group_data.iterrows(): group_data.at[index, row["QNAM"]] = row["QVAL"] - group_data = group_data.drop(columns=["QNAM", "QVAL", "QLABEL", "IDVAR"]) + columns_to_drop = [ + col + for col in ["QNAM", "QVAL", "QLABEL", "IDVAR"] + if col in group_data.columns + ] + group_data = group_data.drop(columns=columns_to_drop) group_data = group_data.rename(columns={"IDVARVAL": idvar_value}) common_keys = [ key @@ -326,6 +344,9 @@ def _merge_supp_with_multiple_idvars( def process_supp(supp_dataset): # TODO: QLABEL is not added to the new columns. This functionality is not supported directly in pandas. # initialize new columns for each unique QNAM in the dataset with NaN + is_dask = isinstance(supp_dataset, DaskDataset) + if is_dask: + supp_dataset = PandasDataset(supp_dataset.data.compute()) for qnam in supp_dataset["QNAM"].unique(): supp_dataset.data[qnam] = pd.NA # Set the value of the new columns only in their respective rows @@ -338,6 +359,24 @@ def process_supp(supp_dataset): supp_dataset = supp_dataset.drop(labels=columns_to_drop, axis=1) return supp_dataset + @staticmethod + def _validate_qnam( + data: pd.DataFrame, + qnam_list: list, + common_keys: List[str], + ): + for qnam in qnam_list: + if qnam not in data.columns: + continue + qnam_check = data.dropna(subset=[qnam]) + if len(qnam_check) == 0: + continue + grouped = qnam_check.groupby(common_keys).size() + if (grouped > 1).any(): + raise ValueError( + f"Multiple records with the same QNAM '{qnam}' match a single parent record" + ) + @staticmethod def merge_sdtm_datasets( left_dataset: DatasetInterface, diff --git a/tests/unit/test_utilities/test_data_processor.py b/tests/unit/test_utilities/test_data_processor.py index 48bb805d2..ddfafbeb2 100644 --- a/tests/unit/test_utilities/test_data_processor.py +++ b/tests/unit/test_utilities/test_data_processor.py @@ -2,7 +2,7 @@ import pandas as pd import pytest from cdisc_rules_engine.utilities.data_processor import DataProcessor -from cdisc_rules_engine.models.dataset import PandasDataset +from cdisc_rules_engine.models.dataset import PandasDataset, DaskDataset from cdisc_rules_engine.enums.join_types import JoinTypes @@ -212,3 +212,138 @@ def test_merge_datasets_on_join_type(join_type: JoinTypes, expected_df: PandasDa join_type=join_type, ) assert merged_df.equals(expected_df) + + +@pytest.mark.parametrize("dataset_implementation", [PandasDataset, DaskDataset]) +def test_merge_pivot_supp_dataset_single_idvar(dataset_implementation): + left_dataset = dataset_implementation.from_dict( + { + "USUBJID": ["CDISC01", "CDISC01", "CDISC01"], + "DOMAIN": ["AE", "AE", "AE"], + "AESEQ": [1, 2, 3], + "AETERM": ["Headache", "Nausea", "Fatigue"], + } + ) + right_dataset = dataset_implementation.from_dict( + { + "USUBJID": ["CDISC01", "CDISC01", "CDISC01"], + "RDOMAIN": ["AE", "AE", "AE"], + "IDVAR": ["AESEQ", "AESEQ", "AESEQ"], + "IDVARVAL": ["1", "2", "3"], + "QNAM": ["AESPID", "AESPID", "AESPID"], + "QVAL": ["SP001", "SP002", "SP003"], + } + ) + + merged_df = DataProcessor.merge_pivot_supp_dataset( + dataset_implementation=dataset_implementation, + left_dataset=left_dataset, + right_dataset=right_dataset, + ) + if isinstance(merged_df, DaskDataset): + result_data = merged_df.data.compute() + else: + result_data = merged_df.data + + # Verify pivot + assert "AESPID" in merged_df.columns, "AESPID column should be created from QNAM" + assert "QNAM" not in merged_df.columns, "QNAM should be dropped after pivot" + assert "QVAL" not in merged_df.columns, "QVAL should be dropped after pivot" + assert result_data[result_data["AESEQ"] == "1"]["AESPID"].values[0] == "SP001" + assert result_data[result_data["AESEQ"] == "2"]["AESPID"].values[0] == "SP002" + assert result_data[result_data["AESEQ"] == "3"]["AESPID"].values[0] == "SP003" + assert len(result_data) == 3 + + +@pytest.mark.parametrize("dataset_implementation", [PandasDataset, DaskDataset]) +def test_merge_pivot_supp_dataset_multiple_idvar(dataset_implementation): + left_dataset = dataset_implementation.from_dict( + { + "USUBJID": ["CDISC01", "CDISC01", "CDISC01"], + "DOMAIN": ["EC", "EC", "EC"], + "ECSEQ": [1, 2, 3], + "ECENDY": [5, 7, 10], + "ECTRT": ["Treatment A", "Treatment B", "Treatment C"], + } + ) + right_dataset = dataset_implementation.from_dict( + { + "USUBJID": ["CDISC01", "CDISC01", "CDISC01", "CDISC01"], + "RDOMAIN": ["EC", "EC", "EC", "EC"], + "IDVAR": ["ECSEQ", "ECSEQ", "ECENDY", "ECENDY"], + "IDVARVAL": ["1", "2", "7", "10"], + "QNAM": ["ECLOC", "ECLOC", "ECSITE", "ECSITE"], + "QVAL": ["Left Arm", "Right Arm", "Site A", "Site B"], + } + ) + + merged_df = DataProcessor.merge_pivot_supp_dataset( + dataset_implementation=dataset_implementation, + left_dataset=left_dataset, + right_dataset=right_dataset, + ) + if isinstance(merged_df, DaskDataset): + result_data = merged_df.data.compute() + else: + result_data = merged_df.data + + # Verify pivot + assert "ECLOC" in merged_df.columns, "ECLOC column should be created from QNAM" + assert "ECSITE" in merged_df.columns, "ECSITE column should be created from QNAM" + assert "QNAM" not in merged_df.columns, "QNAM should be dropped after pivot" + assert "QVAL" not in merged_df.columns, "QVAL should be dropped after pivot" + + row1 = result_data[result_data["ECSEQ"] == "1"].iloc[0] + assert row1["ECLOC"] == "Left Arm" + assert pd.isna(row1["ECSITE"]) + row2 = result_data[result_data["ECSEQ"] == "2"].iloc[0] + assert row2["ECLOC"] == "Right Arm" + assert row2["ECSITE"] == "Site A" + row3 = result_data[result_data["ECSEQ"] == "3"].iloc[0] + assert pd.isna(row3["ECLOC"]) + assert row3["ECSITE"] == "Site B" + assert len(result_data) == 3 + + +@pytest.mark.parametrize("dataset_implementation", [PandasDataset, DaskDataset]) +def test_merge_pivot_supp_dataset_blank_idvar(dataset_implementation): + left_dataset = dataset_implementation.from_dict( + { + "USUBJID": ["CDISC01", "CDISC02"], + "DOMAIN": ["DM", "DM"], + "AGE": [45, 52], + } + ) + right_dataset = dataset_implementation.from_dict( + { + "USUBJID": ["CDISC01", "CDISC02"], + "RDOMAIN": ["DM", "DM"], + "IDVAR": ["", ""], # Blank IDVAR + "IDVARVAL": ["", ""], + "QNAM": ["DMPOPFLAG", "DMPOPFLAG"], + "QVAL": ["Y", "N"], + } + ) + merged_df = DataProcessor.merge_pivot_supp_dataset( + dataset_implementation=dataset_implementation, + left_dataset=left_dataset, + right_dataset=right_dataset, + ) + if isinstance(merged_df, DaskDataset): + result_data = merged_df.data.compute() + else: + result_data = merged_df.data + + # Verify pivot + assert ( + "DMPOPFLAG" in merged_df.columns + ), "DMPOPFLAG column should be created from QNAM" + assert "QNAM" not in merged_df.columns, "QNAM should be dropped after pivot" + assert "QVAL" not in merged_df.columns, "QVAL should be dropped after pivot" + assert ( + result_data[result_data["USUBJID"] == "CDISC01"]["DMPOPFLAG"].values[0] == "Y" + ) + assert ( + result_data[result_data["USUBJID"] == "CDISC02"]["DMPOPFLAG"].values[0] == "N" + ) + assert len(result_data) == 2 From 53b563683cbe476f7bb45f966d8c576cac5436ec Mon Sep 17 00:00:00 2001 From: Samuel Johnson Date: Tue, 18 Nov 2025 15:26:28 -0500 Subject: [PATCH 9/9] dask logic --- .../utilities/data_processor.py | 57 ++++++++++++------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/cdisc_rules_engine/utilities/data_processor.py b/cdisc_rules_engine/utilities/data_processor.py index 30465e4a2..e94ed9c20 100644 --- a/cdisc_rules_engine/utilities/data_processor.py +++ b/cdisc_rules_engine/utilities/data_processor.py @@ -223,19 +223,15 @@ def merge_pivot_supp_dataset( col for col in ["IDVAR", "IDVARVAL"] if col in right_dataset.columns ] current_supp = right_dataset.drop(columns=columns_to_drop) - if isinstance(left_dataset, DaskDataset): - left_pandas = PandasDataset(left_dataset.data.compute()) - merged_pandas = PandasDataset( - pd.merge( - left_pandas.data, - current_supp.data, - how="left", - on=common_keys, - suffixes=("", "_supp"), - ) + if dataset_implementation == DaskDataset: + current_supp = DaskDataset(current_supp.data) + left_dataset = left_dataset.merge( + other=current_supp.data, + how="left", + on=common_keys, + suffixes=("", "_supp"), ) - DataProcessor._validate_qnam(merged_pandas.data, qnam_list, common_keys) - left_dataset = DaskDataset(merged_pandas.data) + DataProcessor._validate_qnam_dask(left_dataset, qnam_list, common_keys) else: left_dataset = PandasDataset( pd.merge( @@ -250,14 +246,15 @@ def merge_pivot_supp_dataset( else: if dataset_implementation == DaskDataset: left_dataset = PandasDataset(left_dataset.data.compute()) - right_dataset = PandasDataset(right_dataset.data.compute()) - left_dataset = DataProcessor._merge_supp_with_multiple_idvars( - left_dataset, right_dataset, static_keys, qnam_list - ) - if dataset_implementation == DaskDataset and not isinstance( - left_dataset, DaskDataset - ): - left_dataset = DaskDataset(left_dataset.data) + right_pandas = PandasDataset(right_dataset.data.compute()) + left_dataset = DataProcessor._merge_supp_with_multiple_idvars( + left_dataset, right_pandas, static_keys, qnam_list + ) + left_dataset = DaskDataset(left_dataset.data) + else: + left_dataset = DataProcessor._merge_supp_with_multiple_idvars( + left_dataset, right_dataset, static_keys, qnam_list + ) return left_dataset @staticmethod @@ -377,6 +374,26 @@ def _validate_qnam( f"Multiple records with the same QNAM '{qnam}' match a single parent record" ) + @staticmethod + def _validate_qnam_dask( + left_dataset: DaskDataset, + qnam_list: list, + common_keys: List[str], + ): + for qnam in qnam_list: + if qnam not in left_dataset.columns: + continue + qnam_check = left_dataset.data[~left_dataset.data[qnam].isna()] + if len(qnam_check) == 0: + continue + grouped_counts = qnam_check.groupby(common_keys).size() + problem_groups = grouped_counts[grouped_counts > 1] + problem_groups_computed = problem_groups.compute() + if len(problem_groups_computed) > 0: + raise ValueError( + f"Multiple records with the same QNAM '{qnam}' match a single parent record. " + ) + @staticmethod def merge_sdtm_datasets( left_dataset: DatasetInterface,