diff --git a/cdisc_rules_engine/models/rule.py b/cdisc_rules_engine/models/rule.py index 7a9fa3f6f..090af8d5a 100644 --- a/cdisc_rules_engine/models/rule.py +++ b/cdisc_rules_engine/models/rule.py @@ -184,12 +184,6 @@ def parse_actions(cls, actions_data: dict) -> List[dict]: @classmethod def parse_datasets(cls, match_key_data: List[dict]) -> List[dict]: - # Defaulting to IDVAR and IDVARVAL as relationship columns. - # May change in the future as more standard rules are written. - relationship_columns = { - "column_with_names": "IDVAR", - "column_with_values": "IDVARVAL", - } if not match_key_data: return None datasets = [] @@ -206,8 +200,6 @@ def parse_datasets(cls, match_key_data: List[dict]) -> List[dict]: ], "wildcard": data.get("Wildcard", "**"), } - if data.get("Is_Relationship", False): - join_data["relationship_columns"] = relationship_columns if "Join_Type" in data: join_data["join_type"] = data.get("Join_Type") if "Child" in data: diff --git a/cdisc_rules_engine/utilities/data_processor.py b/cdisc_rules_engine/utilities/data_processor.py index 7d871b562..e94ed9c20 100644 --- a/cdisc_rules_engine/utilities/data_processor.py +++ b/cdisc_rules_engine/utilities/data_processor.py @@ -44,162 +44,6 @@ def convert_float_merge_keys(series: pd.Series) -> pd.Series: """ return series.astype(float, errors="ignore").astype(str) - @staticmethod - def filter_dataset_by_match_keys_of_other_dataset( - dataset: DatasetInterface, - dataset_match_keys: List[str], - other_dataset: DatasetInterface, - other_dataset_match_keys: List[str], - ) -> DatasetInterface: - """ - Returns a DataFrame where values of match keys of - dataset are equal to values of match keys of other dataset. - Example: - dataset = USUBJID DOMAIN - CDISC001 AE - CDISC001 AE - CDISC009 AE - dataset_match_keys = ["USUBJID"] - other_dataset = USUBJID DOMAIN - CDISC009 AE - other_dataset_match_keys = ["USUBJID"] - - The result will be: USUBJID DOMAIN - CDISC009 AE - """ - dataset_w_ind = dataset.set_index(dataset_match_keys) - other_dataset_w_ind = other_dataset.set_index(other_dataset_match_keys) - condition = dataset_w_ind.index.isin(other_dataset_w_ind.index) - return dataset_w_ind[condition].reset_index() - - @staticmethod - def filter_parent_dataset_by_supp_dataset( - parent_dataset: DatasetInterface, - supp_dataset: DatasetInterface, - column_with_names: str, - column_with_values: str, - ) -> DatasetInterface: - """ - A wrapper function for convenient filtering of parent dataset by supp dataset. - Does two things: - 1. Filters parent dataset by RDOMAIN column of supp dataset. - 2. Filters parent dataset by columns of supp dataset - that describe their relation. - """ - parent_dataset = DataProcessor.filter_parent_dataset_by_supp_dataset_rdomain( - parent_dataset, supp_dataset - ) - return DataProcessor.filter_dataset_by_nested_columns_of_other_dataset( - parent_dataset, supp_dataset, column_with_names, column_with_values - ) - - @staticmethod - def filter_parent_dataset_by_supp_dataset_rdomain( - parent_dataset: DatasetInterface, supp_dataset: DatasetInterface - ) -> DatasetInterface: - """ - Leaves only those rows in parent dataset - where DOMAIN is the same as RDOMAIN of supp dataset. - """ - parent_domain_values: pd.Series = parent_dataset.get("DOMAIN", pd.Series()) - supp_domain_values: pd.Series = supp_dataset.get("RDOMAIN", pd.Series()) - if parent_domain_values.empty or supp_domain_values.empty: - return parent_dataset - - return parent_dataset[parent_domain_values.isin(supp_domain_values)] - - @staticmethod - def filter_dataset_by_nested_columns_of_other_dataset( - dataset: DatasetInterface, - other_dataset: DatasetInterface, - column_with_names: str, - column_with_values: str, - ) -> DatasetInterface: - """ - other_dataset has two columns: - 1. list of column names which exist in dataset - column_with_names. - 2. list of values of the aforementioned columns - column_with_values. - - The function filters columns name of dataset by their respective column values. - - Example: - other_dataset = IDVAR IDVARVAL - ECSEQ 100 - ECSEQ 101 - ECNUM 105 - column_with_names = "IDVAR" - column_with_values = "IDVARVAL" - - We need to leave only those rows in dataset - where dataset["ECSEQ"] is equal to 100 or 101 - AND dataset["ECNUM"] is equal to 105. - """ - if ( - other_dataset[column_with_names].str.strip().eq("").all() - and other_dataset[column_with_values].str.strip().eq("").all() - ): - return dataset - grouped = other_dataset.groupby(column_with_names, group_keys=False) - - def filter_dataset_by_group_values(group) -> DatasetInterface: - decimal_group_values: pd.Series = DataProcessor.convert_float_merge_keys( - group[column_with_values] - ) - decimal_dataset_values: pd.Series = DataProcessor.convert_float_merge_keys( - dataset[group.name] - ) - condition: pd.Series = decimal_dataset_values.isin(decimal_group_values) - return dataset[condition] - - result = grouped.apply(lambda group: filter_dataset_by_group_values(group)) - # grouping breaks sorting, need to sort once again - return dataset.__class__(result.sort_values(list(grouped.groups.keys()))) - - @staticmethod - def merge_datasets_on_relationship_columns( - left_dataset: DatasetInterface, - left_dataset_match_keys: List[str], - right_dataset: DatasetInterface, - right_dataset_match_keys: List[str], - right_dataset_domain_name: str, - column_with_names: str, - column_with_values: str, - ) -> DatasetInterface: - """ - Uses full join to merge given datasets on the - columns that describe their relation. - """ - # right dataset holds column names of left dataset. - # all values in the column are the same - if ( - right_dataset[column_with_names].str.strip().eq("").all() - and right_dataset[column_with_values].str.strip().eq("").all() - ): - return left_dataset.merge( - other=right_dataset.data, - left_on=left_dataset_match_keys, - right_on=right_dataset_match_keys, - how="outer", - suffixes=("", f".{right_dataset_domain_name}"), - ) - left_ds_col_name: str = right_dataset[column_with_names][0] - - # convert numeric columns to one data type to avoid merging errors - # there is no point in converting string cols since their data type is the same - DataProcessor.cast_numeric_cols_to_same_data_type( - right_dataset, column_with_values, left_dataset, left_ds_col_name - ) - left_dataset_match_keys.append(left_ds_col_name) - right_dataset_match_keys.append(column_with_values) - - return left_dataset.merge( - other=right_dataset.data, - left_on=left_dataset_match_keys, - right_on=right_dataset_match_keys, - how="outer", - suffixes=("", f".{right_dataset_domain_name}"), - ) - @staticmethod def filter_if_present(df: DatasetInterface, col: str, filter_value): """ @@ -348,38 +192,6 @@ def merge_relrec_datasets( result = objs[0].concat(objs[1:], ignore_index=True) return result - @staticmethod - def merge_relationship_datasets( - left_dataset: DatasetInterface, - left_dataset_match_keys: List[str], - right_dataset: DatasetInterface, - right_dataset_match_keys: List[str], - right_dataset_domain: dict, - ) -> DatasetInterface: - result = DataProcessor.filter_dataset_by_match_keys_of_other_dataset( - left_dataset, - left_dataset_match_keys, - right_dataset, - right_dataset_match_keys, - ) - result = DataProcessor.filter_parent_dataset_by_supp_dataset( - result, - right_dataset, - **right_dataset_domain["relationship_columns"], - ) - - # convert result back to DatasetInterface class - result = left_dataset.__class__(result.reset_index(drop=True)) - result = DataProcessor.merge_datasets_on_relationship_columns( - left_dataset=result, - left_dataset_match_keys=left_dataset_match_keys, - right_dataset=right_dataset, - right_dataset_match_keys=right_dataset_match_keys, - right_dataset_domain_name=right_dataset_domain.get("domain"), - **right_dataset_domain["relationship_columns"], - ) - return result - @staticmethod def merge_pivot_supp_dataset( dataset_implementation: DatasetInterface, @@ -389,52 +201,199 @@ def merge_pivot_supp_dataset( static_keys = ["STUDYID", "USUBJID", "APID", "POOLID", "SPDEVID"] qnam_list = right_dataset["QNAM"].unique() - right_dataset = DataProcessor.process_supp(right_dataset) - dynamic_key = right_dataset["IDVAR"].iloc[0] + unique_idvar_values = right_dataset["IDVAR"].unique() + if len(unique_idvar_values) == 1: + right_dataset = DataProcessor.process_supp(right_dataset) + dynamic_key = right_dataset["IDVAR"].iloc[0] + is_blank = pd.isna(dynamic_key) or str(dynamic_key).strip() == "" + # Determine the common keys present in both datasets + common_keys = [ + key + for key in static_keys + if key in left_dataset.columns and key in right_dataset.columns + ] + if not is_blank: + common_keys.append(dynamic_key) + current_supp = right_dataset.rename(columns={"IDVARVAL": dynamic_key}) + current_supp = current_supp.drop(columns=["IDVAR"]) + left_dataset[dynamic_key] = left_dataset[dynamic_key].astype(str) + current_supp[dynamic_key] = current_supp[dynamic_key].astype(str) + else: + columns_to_drop = [ + col for col in ["IDVAR", "IDVARVAL"] if col in right_dataset.columns + ] + current_supp = right_dataset.drop(columns=columns_to_drop) + if dataset_implementation == DaskDataset: + current_supp = DaskDataset(current_supp.data) + left_dataset = left_dataset.merge( + other=current_supp.data, + how="left", + on=common_keys, + suffixes=("", "_supp"), + ) + DataProcessor._validate_qnam_dask(left_dataset, qnam_list, common_keys) + else: + left_dataset = PandasDataset( + pd.merge( + left_dataset.data, + current_supp.data, + how="left", + on=common_keys, + suffixes=("", "_supp"), + ) + ) + DataProcessor._validate_qnam(left_dataset.data, qnam_list, common_keys) + else: + if dataset_implementation == DaskDataset: + left_dataset = PandasDataset(left_dataset.data.compute()) + right_pandas = PandasDataset(right_dataset.data.compute()) + left_dataset = DataProcessor._merge_supp_with_multiple_idvars( + left_dataset, right_pandas, static_keys, qnam_list + ) + left_dataset = DaskDataset(left_dataset.data) + else: + left_dataset = DataProcessor._merge_supp_with_multiple_idvars( + left_dataset, right_dataset, static_keys, qnam_list + ) + return left_dataset - # Determine the common keys present in both datasets - common_keys = [ - key - for key in static_keys - if key in left_dataset.columns and key in right_dataset.columns - ] - common_keys.append(dynamic_key) - current_supp = right_dataset.rename(columns={"IDVARVAL": dynamic_key}) - current_supp = current_supp.drop(columns=["IDVAR"]) - left_dataset[dynamic_key] = left_dataset[dynamic_key].astype(str) - current_supp[dynamic_key] = current_supp[dynamic_key].astype(str) - left_dataset = PandasDataset( - pd.merge( - left_dataset.data, - current_supp.data, - how="left", - on=common_keys, - suffixes=("", "_supp"), + @staticmethod + def _merge_supp_with_multiple_idvars( + left_dataset: DatasetInterface, + right_dataset: DatasetInterface, + static_keys: List[str], + qnam_list: list, + ) -> DatasetInterface: + result_dataset = left_dataset + + # Process each IDVAR group separately + for idvar_value in right_dataset["IDVAR"].unique(): + group_data = right_dataset.data[ + right_dataset.data["IDVAR"] == idvar_value + ].copy() + group_qnam_list = group_data["QNAM"].unique() + for qnam in group_qnam_list: + group_data[qnam] = pd.NA + for index, row in group_data.iterrows(): + group_data.at[index, row["QNAM"]] = row["QVAL"] + columns_to_drop = [ + col + for col in ["QNAM", "QVAL", "QLABEL", "IDVAR"] + if col in group_data.columns + ] + group_data = group_data.drop(columns=columns_to_drop) + group_data = group_data.rename(columns={"IDVARVAL": idvar_value}) + common_keys = [ + key + for key in static_keys + if key in result_dataset.columns and key in group_data.columns + ] + common_keys.append(idvar_value) + + agg_dict = { + col: lambda x: x.dropna().iloc[0] if not x.dropna().empty else pd.NA + for col in group_data.columns + if col not in common_keys + } + group_data = group_data.groupby( + common_keys, as_index=False, dropna=False + ).agg(agg_dict) + cols_to_drop = [ + col + for col in group_data.columns + if col in result_dataset.columns and col not in common_keys + ] + if cols_to_drop: + group_data = group_data.drop(columns=cols_to_drop) + result_dataset[idvar_value] = result_dataset[idvar_value].astype(str) + group_data[idvar_value] = group_data[idvar_value].astype(str) + + result_dataset = PandasDataset( + pd.merge( + result_dataset.data, + group_data, + how="left", + on=common_keys, + suffixes=("", "_supp"), + ) ) - ) for qnam in qnam_list: - qnam_check = left_dataset.data.dropna(subset=[qnam]) - grouped = qnam_check.groupby(common_keys).size() + if qnam not in result_dataset.columns: + continue + qnam_check = result_dataset.data.dropna(subset=[qnam]) + if len(qnam_check) == 0: + continue + idvar_for_qnam = right_dataset.data[right_dataset.data["QNAM"] == qnam][ + "IDVAR" + ].iloc[0] + validation_keys = [ + key for key in static_keys if key in result_dataset.columns + ] + validation_keys.append(idvar_for_qnam) + grouped = qnam_check.groupby(validation_keys).size() if (grouped > 1).any(): raise ValueError( f"Multiple records with the same QNAM '{qnam}' match a single parent record" ) - if dataset_implementation == DaskDataset: - left_dataset = DaskDataset(left_dataset.data) - return left_dataset + return result_dataset @staticmethod def process_supp(supp_dataset): # TODO: QLABEL is not added to the new columns. This functionality is not supported directly in pandas. # initialize new columns for each unique QNAM in the dataset with NaN + is_dask = isinstance(supp_dataset, DaskDataset) + if is_dask: + supp_dataset = PandasDataset(supp_dataset.data.compute()) for qnam in supp_dataset["QNAM"].unique(): supp_dataset.data[qnam] = pd.NA # Set the value of the new columns only in their respective rows for index, row in supp_dataset.iterrows(): supp_dataset.at[index, row["QNAM"]] = row["QVAL"] - supp_dataset.drop(labels=["QNAM", "QVAL", "QLABEL"], axis=1) + columns_to_drop = [ + col for col in ["QNAM", "QVAL", "QLABEL"] if col in supp_dataset.columns + ] + if columns_to_drop: + supp_dataset = supp_dataset.drop(labels=columns_to_drop, axis=1) return supp_dataset + @staticmethod + def _validate_qnam( + data: pd.DataFrame, + qnam_list: list, + common_keys: List[str], + ): + for qnam in qnam_list: + if qnam not in data.columns: + continue + qnam_check = data.dropna(subset=[qnam]) + if len(qnam_check) == 0: + continue + grouped = qnam_check.groupby(common_keys).size() + if (grouped > 1).any(): + raise ValueError( + f"Multiple records with the same QNAM '{qnam}' match a single parent record" + ) + + @staticmethod + def _validate_qnam_dask( + left_dataset: DaskDataset, + qnam_list: list, + common_keys: List[str], + ): + for qnam in qnam_list: + if qnam not in left_dataset.columns: + continue + qnam_check = left_dataset.data[~left_dataset.data[qnam].isna()] + if len(qnam_check) == 0: + continue + grouped_counts = qnam_check.groupby(common_keys).size() + problem_groups = grouped_counts[grouped_counts > 1] + problem_groups_computed = problem_groups.compute() + if len(problem_groups_computed) > 0: + raise ValueError( + f"Multiple records with the same QNAM '{qnam}' match a single parent record. " + ) + @staticmethod def merge_sdtm_datasets( left_dataset: DatasetInterface, @@ -471,54 +430,6 @@ def merge_sdtm_datasets( ] = None return result - @staticmethod - def cast_numeric_cols_to_same_data_type( - left_dataset: DatasetInterface, - left_dataset_column: str, - right_dataset: DatasetInterface, - right_dataset_column: str, - ): - """ - Casts given columns to one data type (float) - ONLY if they are numeric. - Before casting, the method ensures that both of the - columns hold numeric values and performs conversion - only if both of them are. - - Example 1: - left_dataset[left_dataset_column] = [1, 2, 3, 4, ] - right_dataset[right_dataset_column] = ["1.0", "2.0", "3.0", "4.0", ] - - Example 2: - left_dataset[left_dataset_column] = ["1", "2", "3", "4", ] - right_dataset[right_dataset_column] = ["1.0", "2.0", "3.0", "4.0", ] - - Example 3: - left_dataset[left_dataset_column] = ["1", "2", "3", "4", ] - right_dataset[right_dataset_column] = [1, 2, 3, 4, ] - """ - # check if both columns are numeric - left_is_numeric: bool = DataProcessor.column_contains_numeric( - left_dataset[left_dataset_column] - ) - right_is_numeric: bool = DataProcessor.column_contains_numeric( - right_dataset[right_dataset_column] - ) - if left_is_numeric and right_is_numeric: - # convert to float - right_dataset[right_dataset_column] = right_dataset[ - right_dataset_column - ].astype(float) - left_dataset[left_dataset_column] = left_dataset[ - left_dataset_column - ].astype(float) - - @staticmethod - def column_contains_numeric(column: pd.Series) -> bool: - if not pd.api.types.is_numeric_dtype(column): - return column.str.replace(".", "").str.isdigit().all() - return True - @staticmethod def filter_dataset_columns_by_metadata_and_rule( columns: List[str], diff --git a/cdisc_rules_engine/utilities/dataset_preprocessor.py b/cdisc_rules_engine/utilities/dataset_preprocessor.py index aca3dead9..a86437d69 100644 --- a/cdisc_rules_engine/utilities/dataset_preprocessor.py +++ b/cdisc_rules_engine/utilities/dataset_preprocessor.py @@ -495,20 +495,14 @@ def _merge_datasets( dataset_preprocessor=self, wildcard=right_dataset_domain_details.get("wildcard"), ) - elif right_dataset_domain_name == "SUPP--": - result: DatasetInterface = DataProcessor.merge_pivot_supp_dataset( + elif right_dataset_domain_name.startswith( + "SUPP" + ) or right_dataset_domain_name.startswith("SQ"): + result = DataProcessor.merge_pivot_supp_dataset( dataset_implementation=self._data_service.dataset_implementation, left_dataset=left_dataset, right_dataset=right_dataset, ) - elif self._rule_processor.is_relationship_dataset(right_dataset_domain_name): - result: DatasetInterface = DataProcessor.merge_relationship_datasets( - left_dataset=left_dataset, - left_dataset_match_keys=left_dataset_match_keys, - right_dataset=right_dataset, - right_dataset_match_keys=right_dataset_match_keys, - right_dataset_domain=right_dataset_domain_details, - ) else: result: DatasetInterface = DataProcessor.merge_sdtm_datasets( left_dataset=left_dataset, diff --git a/requirements.txt b/requirements.txt index 947ee21e7..4e130d38f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,25 +1,25 @@ business_rules_enhanced==1.4.8 +cachetools==6.1.0 cdisc-library-client==0.1.6 click==8.1.7 +dask[dataframe]==2024.6.0 +dask[array]==2024.6.0 +fastparquet==2024.2.0 importlib-metadata==8.5.0 jsonata-python==0.6.0 jsonpath-ng==1.6.1 jsonschema==4.18.5 +lxml==5.2.1 numpy~=1.26.0 odmlib==0.1.4 openpyxl==3.1.5 pandas==2.1.4 +psutil==6.1.1 pyinstaller==6.11.0 +Pympler==1.1 +pyreadstat==1.2.7 python-dotenv==1.0.0 pyyaml==6.0.2 redis==4.5.0 requests~=2.32.3 setuptools~=75.6.0 -cachetools==6.1.0 -Pympler==1.1 -psutil==6.1.1 -dask[dataframe]==2024.6.0 -dask[array]==2024.6.0 -pyreadstat==1.2.7 -fastparquet==2024.2.0 -lxml==5.2.1 \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 2b79308b3..b0bed96db 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -876,10 +876,6 @@ def dataset_rule_record_in_parent_domain_equal_to() -> dict: { "domain_name": "SUPPEC", "match_key": ["USUBJID"], - "relationship_columns": { - "column_with_names": "IDVAR", - "column_with_values": "IDVARVAL", - }, } ], "conditions": ConditionCompositeFactory.get_condition_composite( @@ -888,7 +884,7 @@ def dataset_rule_record_in_parent_domain_equal_to() -> dict: { "name": "get_dataset", "operator": "equal_to", - "value": {"target": "QNAM", "comparator": "ECREASOC"}, + "value": {"target": "ECREASOC", "comparator": "Some Value 1"}, }, { "name": "get_dataset", @@ -905,7 +901,7 @@ def dataset_rule_record_in_parent_domain_equal_to() -> dict: } ], "output_variables": [ - "QNAM", + "ECREASOC", "ECPRESP", ], } diff --git a/tests/resources/CoreIssue747/Rule_underscores.json b/tests/resources/CoreIssue747/Rule_underscores.json index 709018a6b..ccaab488b 100755 --- a/tests/resources/CoreIssue747/Rule_underscores.json +++ b/tests/resources/CoreIssue747/Rule_underscores.json @@ -1,118 +1,111 @@ { - "Authorities": [ + "Authorities": [ + { + "Organization": "CDISC", + "Standards": [ { - "Organization": "CDISC", - "Standards": [ - { - "Name": "SDTMIG", - "References": [ - { - "Citations": [ - { - "Document": "IG v3.4", - "Section": "Table 3.2.1", - "Cited_Guidance": "Note that the key variables shown in this table are examples only. A sponsor's actual key structure may be different." - }, - { - "Document": "IG v3.4", - "Section": "3.2.1.1", - "Cited_Guidance": "Since the purpose of this column is to aid reviewers in understanding the structure of a dataset, sponsors should list all of the natural keys (see definition below) for the dataset. These keys should define uniqueness for records within a dataset, and may define a record sort order. The identified keys for each dataset should be consistent with the description of the dataset structure as described in the Define-XML document." - } - ], - "Origin": "SDTM and SDTMIG Conformance Rules", - "Version": "2.0", - "Rule_Identifier": { - "Id": "CG0019", - "Version": "1" - } - } - ], - "Version": "3.4" - }, + "Name": "SDTMIG", + "References": [ + { + "Citations": [ { - "Name": "SDTMIG", - "References": [ - { - "Citations": [ - { - "Document": "IG v3.2", - "Section": "Table 3.2.1|3.2.1.1", - "Cited_Guidance": "Table 3.2.1[Note that the key variables shown in this table are examples only. A sponsor's actual key structure may be different.]|3.2.1.1[Since the purpose of this column is to aid reviewers in understanding the structure of a dataset, sponsors should list all of the natural keys (see definition below) for the dataset. These keys should define uniqueness for records within a dataset, and may define a record sort order.]" - } - ], - "Origin": "SDTM and SDTMIG Conformance Rules", - "Version": "2.0", - "Rule_Identifier": { - "Id": "CG0019", - "Version": "1" - } - } - ], - "Version": "3.2" + "Document": "IG v3.4", + "Section": "Table 3.2.1", + "Cited_Guidance": "Note that the key variables shown in this table are examples only. A sponsor's actual key structure may be different." }, { - "Name": "SDTMIG", - "References": [ - { - "Citations": [ - { - "Document": "IG v3.3", - "Section": "Table 3.2.1|3.2.1.1", - "Cited_Guidance": "Table 3.2.1[Note that the key variables shown in this table are examples only. A sponsor's actual key structure may be different.]||3.2.1.1[Since the purpose of this column is to aid reviewers in understanding the structure of a dataset, sponsors should list all of the natural keys (see definition below) for the dataset. These keys should define uniqueness for records within a dataset, and may define a record sort order.]" - } - ], - "Origin": "SDTM and SDTMIG Conformance Rules", - "Version": "2.0", - "Rule_Identifier": { - "Id": "CG0019", - "Version": "1" - } - } - ], - "Version": "3.3" + "Document": "IG v3.4", + "Section": "3.2.1.1", + "Cited_Guidance": "Since the purpose of this column is to aid reviewers in understanding the structure of a dataset, sponsors should list all of the natural keys (see definition below) for the dataset. These keys should define uniqueness for records within a dataset, and may define a record sort order. The identified keys for each dataset should be consistent with the description of the dataset structure as described in the Define-XML document." } - ] - } - ], - "Check": { - "all": [ + ], + "Origin": "SDTM and SDTMIG Conformance Rules", + "Version": "2.0", + "Rule_Identifier": { + "Id": "CG0019", + "Version": "1" + } + } + ], + "Version": "3.4" + }, + { + "Name": "SDTMIG", + "References": [ { - "name": "define_dataset_key_sequence", - "operator": "is_not_unique_set" + "Citations": [ + { + "Document": "IG v3.2", + "Section": "Table 3.2.1|3.2.1.1", + "Cited_Guidance": "Table 3.2.1[Note that the key variables shown in this table are examples only. A sponsor's actual key structure may be different.]|3.2.1.1[Since the purpose of this column is to aid reviewers in understanding the structure of a dataset, sponsors should list all of the natural keys (see definition below) for the dataset. These keys should define uniqueness for records within a dataset, and may define a record sort order.]" + } + ], + "Origin": "SDTM and SDTMIG Conformance Rules", + "Version": "2.0", + "Rule_Identifier": { + "Id": "CG0019", + "Version": "1" + } } - ] - }, - "Core": { - "Id": "CDISC.SDTMIG.CG0019", - "Status": "Draft", - "Version": "1" - }, - "Description": "Trigger error if records are not unique as per sponsor defined key variables as documented in the define.xml", - "Executability": "Fully Executable", - "Outcome": { - "Message": "Records are not unique as per sponsor defined key variables as documented in the define.xml" - }, - "Scope": { - "Classes": { - "Include": [ - "ALL" - ] + ], + "Version": "3.2" }, - "Domains": { - "Include": [ - "ALL" - ] - } - }, - "Sensitivity": "Record", - "Match_Datasets": [ { - "Keys": [ - "USUBJID" - ], - "Name": "SUPP--", - "Is_Relationship": true + "Name": "SDTMIG", + "References": [ + { + "Citations": [ + { + "Document": "IG v3.3", + "Section": "Table 3.2.1|3.2.1.1", + "Cited_Guidance": "Table 3.2.1[Note that the key variables shown in this table are examples only. A sponsor's actual key structure may be different.]||3.2.1.1[Since the purpose of this column is to aid reviewers in understanding the structure of a dataset, sponsors should list all of the natural keys (see definition below) for the dataset. These keys should define uniqueness for records within a dataset, and may define a record sort order.]" + } + ], + "Origin": "SDTM and SDTMIG Conformance Rules", + "Version": "2.0", + "Rule_Identifier": { + "Id": "CG0019", + "Version": "1" + } + } + ], + "Version": "3.3" } - ], - "Rule_Type": "Dataset Contents Check against Define XML" + ] + } + ], + "Check": { + "all": [ + { + "name": "define_dataset_key_sequence", + "operator": "is_not_unique_set" + } + ] + }, + "Core": { + "Id": "CDISC.SDTMIG.CG0019", + "Status": "Draft", + "Version": "1" + }, + "Description": "Trigger error if records are not unique as per sponsor defined key variables as documented in the define.xml", + "Executability": "Fully Executable", + "Outcome": { + "Message": "Records are not unique as per sponsor defined key variables as documented in the define.xml" + }, + "Scope": { + "Classes": { + "Include": ["ALL"] + }, + "Domains": { + "Include": ["ALL"] + } + }, + "Sensitivity": "Record", + "Match_Datasets": [ + { + "Keys": ["USUBJID"], + "Name": "SUPP--" + } + ], + "Rule_Type": "Dataset Contents Check against Define XML" } diff --git a/tests/unit/test_dataset_preprocessor.py b/tests/unit/test_dataset_preprocessor.py index ab5c456da..319038eff 100644 --- a/tests/unit/test_dataset_preprocessor.py +++ b/tests/unit/test_dataset_preprocessor.py @@ -972,7 +972,7 @@ def test_preprocess_with_merge_comparison( def test_preprocess_supp_with_blank_idvar_idvarval(mock_get_dataset): """ Test preprocessing when SUPP dataset has blank IDVAR and IDVARVAL. - Should merge successfully without attempting float conversion. + Should pivot and merge on static keys. """ main_data = { "USUBJID": ["CDISC001", "CDISC002"], @@ -984,8 +984,8 @@ def test_preprocess_supp_with_blank_idvar_idvarval(mock_get_dataset): supp_data = { "USUBJID": ["CDISC001", "CDISC002"], "RDOMAIN": ["AE", "AE"], - "IDVAR": ["", ""], # Blank IDVAR - "IDVARVAL": ["", ""], # Blank IDVARVAL + "IDVAR": ["", ""], + "IDVARVAL": ["", ""], "QNAM": ["AESPID", "AESPID"], "QVAL": ["SCREENING", "BASELINE"], } @@ -1005,10 +1005,6 @@ def test_preprocess_supp_with_blank_idvar_idvarval(mock_get_dataset): { "domain_name": "SUPPAE", "match_key": ["USUBJID"], - "relationship_columns": { - "column_with_names": "IDVAR", - "column_with_values": "IDVARVAL", - }, } ], "conditions": ConditionCompositeFactory.get_condition_composite( @@ -1017,7 +1013,7 @@ def test_preprocess_supp_with_blank_idvar_idvarval(mock_get_dataset): { "name": "get_dataset", "operator": "equal_to", - "value": {"target": "QVAL", "comparator": "SCREENING"}, + "value": {"target": "AESPID", "comparator": "SCREENING"}, } ] } @@ -1030,9 +1026,17 @@ def test_preprocess_supp_with_blank_idvar_idvarval(mock_get_dataset): ] result = preprocessor.preprocess(rule, datasets) assert len(result.data) == 2 - assert "QNAM" in result.data.columns - assert "QVAL" in result.data.columns - assert all(qnam == "AESPID" for qnam in result.data["QNAM"]) + assert "AESPID" in result.data.columns + assert "QNAM" not in result.data.columns + assert "QVAL" not in result.data.columns + assert ( + result.data[result.data["USUBJID"] == "CDISC001"]["AESPID"].values[0] + == "SCREENING" + ) + assert ( + result.data[result.data["USUBJID"] == "CDISC002"]["AESPID"].values[0] + == "BASELINE" + ) @patch("cdisc_rules_engine.services.data_services.LocalDataService.get_dataset") @@ -1155,10 +1159,6 @@ def test_preprocess_specific_suppae_dataset( { "domain_name": "SUPPAE", "match_key": ["USUBJID"], - "relationship_columns": { - "column_with_names": "IDVAR", - "column_with_values": "IDVARVAL", - }, } ], "conditions": ConditionCompositeFactory.get_condition_composite( @@ -1167,7 +1167,7 @@ def test_preprocess_specific_suppae_dataset( { "name": "get_dataset", "operator": "equal_to", - "value": {"target": "QVAL", "comparator": "SP001"}, + "value": {"target": "AESPID", "comparator": "SP001"}, } ] } @@ -1184,20 +1184,15 @@ def test_preprocess_specific_suppae_dataset( data_service = LocalDataService(MagicMock(), MagicMock(), MagicMock()) preprocessor = DatasetPreprocessor( ae_dataset, - SDTMDatasetMetadata( - first_record={"DOMAIN": "AE"}, full_path=os.path.join("path", "ae.xpt") - ), + SDTMDatasetMetadata(first_record={"DOMAIN": "AE"}, full_path="path"), data_service, InMemoryCacheService(), ) result = preprocessor.preprocess(rule_with_specific_supp, datasets) - assert len(result.data) >= 1 - assert "USUBJID" in result.data.columns - assert "AETERM" in result.data.columns - assert "QNAM" in result.data.columns - assert "QVAL" in result.data.columns - qval_values = result.data["QVAL"].dropna().tolist() - assert "SP001" in qval_values - expected_path = os.path.join("path", "suppae.xpt") - mock_get_dataset.assert_called_with(dataset_name=expected_path) + + assert len(result.data) == 1 + assert "AESPID" in result.data.columns + assert result.data["AESPID"].values[0] == "SP001" + assert "QNAM" not in result.data.columns + assert "QVAL" not in result.data.columns diff --git a/tests/unit/test_merge_supp_datasets.py b/tests/unit/test_merge_supp_datasets.py index 8ab6e026a..370bdb728 100644 --- a/tests/unit/test_merge_supp_datasets.py +++ b/tests/unit/test_merge_supp_datasets.py @@ -123,3 +123,176 @@ def dummy_func(dataset_name, **kwargs): ) assert len(merged_dataset.data) == len(parent_dataset.data), "The" " length of the merged dataset should match the parent dataset." + + +@patch.object(LocalDataService, "check_filepath", return_value=False) +@patch.object(LocalDataService, "_async_get_datasets") +def test_merge_supp_dataset_multi_idvar(mock_async_get_datasets, data_service): + parent_dataset = PandasDataset( + pd.DataFrame( + { + "STUDYID": ["STUDY1", "STUDY1", "STUDY1", "STUDY1"], + "USUBJID": ["001", "001", "002", "002"], + "ECSEQ": ["1", "2", "1", "2"], + "ECENDY": ["5", "7", "5", "7"], + "ECTRT": ["Treatment A", "Treatment B", "Treatment A", "Treatment B"], + } + ) + ) + + supp_dataset = PandasDataset( + pd.DataFrame( + { + "STUDYID": ["STUDY1", "STUDY1", "STUDY1", "STUDY1", "STUDY1", "STUDY1"], + "USUBJID": ["001", "001", "001", "001", "002", "002"], + "RDOMAIN": ["EC", "EC", "EC", "EC", "EC", "EC"], + "IDVAR": ["ECSEQ", "ECSEQ", "ECENDY", "ECENDY", "ECSEQ", "ECENDY"], + "IDVARVAL": ["1", "2", "7", "7", "1", "5"], + "QNAM": ["ECLOC", "ECLOC", "ECSITE", "ECREGION", "ECLOC", "ECSITE"], + "QVAL": [ + "Left Arm", + "Right Arm", + "Site A", + "Region 1", + "Left Leg", + "Site B", + ], + "QLABEL": [ + "Location", + "Location", + "Site", + "Region", + "Location", + "Site", + ], + "QORIG": ["CRF", "CRF", "CRF", "CRF", "CRF", "CRF"], + "QEVAL": ["", "", "", "", "", ""], + } + ) + ) + + mock_async_get_datasets.return_value = [parent_dataset, supp_dataset] + merged_dataset = DataProcessor.merge_pivot_supp_dataset( + data_service.dataset_implementation, parent_dataset, supp_dataset + ) + assert "ECLOC" in merged_dataset.columns, "ECLOC column should be added from SUPP" + assert "ECSITE" in merged_dataset.columns, "ECSITE column should be added from SUPP" + assert ( + "ECREGION" in merged_dataset.columns + ), "ECREGION column should be added from SUPP" + + row1 = merged_dataset.data[ + (merged_dataset.data["USUBJID"] == "001") + & (merged_dataset.data["ECSEQ"] == "1") + ].iloc[0] + assert row1["ECLOC"] == "Left Arm", "ECLOC should match ECSEQ=1" + assert pd.isna(row1["ECSITE"]), "ECSITE should be NaN for ECSEQ=1" + + row2 = merged_dataset.data[ + (merged_dataset.data["USUBJID"] == "001") + & (merged_dataset.data["ECSEQ"] == "2") + ].iloc[0] + assert row2["ECLOC"] == "Right Arm", "ECLOC should match ECSEQ=2" + assert row2["ECSITE"] == "Site A", "ECSITE should match ECENDY=7" + assert row2["ECREGION"] == "Region 1", "ECREGION should match ECENDY=7" + + row3 = merged_dataset.data[ + (merged_dataset.data["USUBJID"] == "002") + & (merged_dataset.data["ECSEQ"] == "1") + ].iloc[0] + assert row3["ECLOC"] == "Left Leg", "ECLOC should match ECSEQ=1" + assert row3["ECSITE"] == "Site B", "ECSITE should match ECENDY=5" + assert len(merged_dataset.data) == len( + parent_dataset.data + ), "Merged dataset should have same number of rows as parent" + + +@patch.object(LocalDataService, "check_filepath", return_value=False) +@patch.object(LocalDataService, "_async_get_datasets") +def test_merge_supp_dataset_multi_idvar_aggregation( + mock_async_get_datasets, data_service +): + parent_dataset = PandasDataset( + pd.DataFrame( + { + "STUDYID": ["STUDY1", "STUDY1"], + "USUBJID": ["001", "001"], + "ECSEQ": ["1", "2"], + "ECENDY": ["5", "7"], + "ECTRT": ["Treatment A", "Treatment B"], + } + ) + ) + supp_dataset = PandasDataset( + pd.DataFrame( + { + "STUDYID": ["STUDY1", "STUDY1", "STUDY1"], + "USUBJID": ["001", "001", "001"], + "RDOMAIN": ["EC", "EC", "EC"], + "IDVAR": ["ECENDY", "ECENDY", "ECSEQ"], + "IDVARVAL": ["7", "7", "1"], + "QNAM": ["ECLOC", "ECSITE", "ECLOC"], + "QVAL": ["Location A", "Site A", "Location B"], + "QLABEL": ["Location", "Site", "Location"], + "QORIG": ["CRF", "CRF", "CRF"], + "QEVAL": ["", "", ""], + } + ) + ) + + mock_async_get_datasets.return_value = [parent_dataset, supp_dataset] + + merged_dataset = DataProcessor.merge_pivot_supp_dataset( + data_service.dataset_implementation, parent_dataset, supp_dataset + ) + + assert len(merged_dataset.data) == len( + parent_dataset.data + ), "Should not create duplicate rows when aggregating multiple QNAM for same IDVAR/IDVARVAL" + + # Both ECLOC and ECSITE from ECENDY=7 should be present in the ECSEQ=2 row + row = merged_dataset.data[merged_dataset.data["ECSEQ"] == "2"].iloc[0] + assert row["ECLOC"] == "Location A", "ECLOC from ECENDY=7 should be merged" + assert row["ECSITE"] == "Site A", "ECSITE from ECENDY=7 should be merged" + + +@patch.object(LocalDataService, "check_filepath", return_value=False) +@patch.object(LocalDataService, "_async_get_datasets") +def test_merge_supp_dataset_multi_idvar_same_qnam_validation_error( + mock_async_get_datasets, data_service +): + parent_dataset = PandasDataset( + pd.DataFrame( + { + "STUDYID": ["STUDY1"], + "USUBJID": ["001"], + "ECSEQ": ["1"], + "ECENDY": ["5"], + "ECTRT": ["Treatment A"], + } + ) + ) + + supp_dataset = PandasDataset( + pd.DataFrame( + { + "STUDYID": ["STUDY1", "STUDY1"], + "USUBJID": ["001", "001"], + "RDOMAIN": ["EC", "EC"], + "IDVAR": ["ECSEQ", "ECSEQ"], + "IDVARVAL": ["1", "1"], + "QNAM": ["ECLOC", "ECLOC"], + "QVAL": ["Location A", "Location B"], + "QLABEL": ["Location", "Location"], + "QORIG": ["CRF", "CRF"], + "QEVAL": ["", ""], + } + ) + ) + + mock_async_get_datasets.return_value = [parent_dataset, supp_dataset] + + with pytest.raises(ValueError, match="Multiple records with the same QNAM"): + DataProcessor.merge_pivot_supp_dataset( + data_service.dataset_implementation, parent_dataset, supp_dataset + ) diff --git a/tests/unit/test_rule.py b/tests/unit/test_rule.py index 48285a678..51244bcfc 100644 --- a/tests/unit/test_rule.py +++ b/tests/unit/test_rule.py @@ -86,15 +86,11 @@ def test_valid_parse_actions(): [{"domain_name": "AA", "match_key": ["USUBJID"], "wildcard": "**"}], ), ( - [{"Name": "SUPPEC", "Keys": ["USUBJID"], "Is_Relationship": True}], + [{"Name": "SUPPEC", "Keys": ["USUBJID"]}], [ { "domain_name": "SUPPEC", "match_key": ["USUBJID"], - "relationship_columns": { - "column_with_names": "IDVAR", - "column_with_values": "IDVARVAL", - }, "wildcard": "**", } ], diff --git a/tests/unit/test_rules_engine.py b/tests/unit/test_rules_engine.py index 24183c0ff..7e8bf872e 100644 --- a/tests/unit/test_rules_engine.py +++ b/tests/unit/test_rules_engine.py @@ -1971,13 +1971,14 @@ def test_validate_record_in_parent_domain( "ECREASOC", "ECREASOS", ], + "QVAL": ["Some Value 1", "Some Value 2"], "IDVAR": [ "ECSEQ", "ECSEQ", ], "IDVARVAL": [ - "4.0", - "5.0", + "4", + "5", ], } ) @@ -2017,13 +2018,13 @@ def test_validate_record_in_parent_domain( "executionStatus": "success", "domain": "EC", "dataset": "ec.xpt", - "variables": ["ECPRESP", "QNAM"], + "variables": ["ECPRESP", "ECREASOC"], "message": "Dataset contents is wrong.", "errors": [ { "dataset": "ec.xpt", "row": 4, - "value": {"ECPRESP": "Y", "QNAM": "ECREASOC"}, + "value": {"ECPRESP": "Y", "ECREASOC": "Some Value 1"}, "USUBJID": "CDISC005", "SEQ": 4, } diff --git a/tests/unit/test_utilities/test_data_processor.py b/tests/unit/test_utilities/test_data_processor.py index bee22056d..ddfafbeb2 100644 --- a/tests/unit/test_utilities/test_data_processor.py +++ b/tests/unit/test_utilities/test_data_processor.py @@ -53,259 +53,6 @@ def test_filter_dataset_columns_by_metadata_and_rule(): ] -@pytest.mark.parametrize("dataset_implementation", [PandasDataset, DaskDataset]) -def test_merge_datasets_on_relationship_columns(dataset_implementation): - """ - Unit test for DataProcessor.merge_datasets_on_relationship_columns method. - """ - # prepare data - left_dataset = dataset_implementation.from_dict( - { - "USUBJID": [ - "CDISC01", - "CDISC01", - "CDISC01", - ], - "DOMAIN": [ - "AE", - "AE", - "AE", - ], - "AESEQ": [ - 1, - 2, - 3, - ], - } - ) - right_dataset = dataset_implementation.from_dict( - { - "USUBJID": [ - "CDISC01", - "CDISC01", - "CDISC01", - "CDISC01", - ], - "RDOMAIN": [ - "AE", - "AE", - "AE", - "AE", - ], - "QNAM": [ - "TEST", - "TEST", - "TEST", - "TEST_1", - ], - "IDVAR": [ - "AESEQ", - "AESEQ", - "AESEQ", - "AESEQ", - ], - "IDVARVAL": [ - "1.0", - "2", - "3.0", - "3.0", - ], - } - ) - - # call the tested function and check the results - merged_df = DataProcessor.merge_datasets_on_relationship_columns( - left_dataset=left_dataset, - left_dataset_match_keys=[], - right_dataset=right_dataset, - right_dataset_match_keys=[], - right_dataset_domain_name="SUPPAE", - column_with_names="IDVAR", - column_with_values="IDVARVAL", - ) - merged_df.data = merged_df.data.sort_values("AESEQ") - expected_df = dataset_implementation.from_dict( - { - "USUBJID": [ - "CDISC01", - "CDISC01", - "CDISC01", - "CDISC01", - ], - "DOMAIN": [ - "AE", - "AE", - "AE", - "AE", - ], - "AESEQ": [ - 1.0, - 2.0, - 3.0, - 3.0, - ], - "USUBJID.SUPPAE": [ - "CDISC01", - "CDISC01", - "CDISC01", - "CDISC01", - ], - "RDOMAIN": [ - "AE", - "AE", - "AE", - "AE", - ], - "QNAM": [ - "TEST", - "TEST", - "TEST", - "TEST_1", - ], - "IDVAR": [ - "AESEQ", - "AESEQ", - "AESEQ", - "AESEQ", - ], - "IDVARVAL": [ - 1.0, - 2.0, - 3.0, - 3.0, - ], - } - ) - assert merged_df.equals(expected_df) - - -@pytest.mark.parametrize("dataset_implementation", [PandasDataset]) -def test_merge_datasets_on_string_relationship_columns(dataset_implementation): - """ - Unit test for DataProcessor.merge_datasets_on_relationship_columns method. - Test the case when the columns that describe the relation - are of a string type. - """ - # prepare data - left_dataset = dataset_implementation.from_dict( - { - "USUBJID": [ - "CDISC01", - "CDISC01", - "CDISC01", - ], - "DOMAIN": [ - "AE", - "AE", - "AE", - ], - "AESEQ": [ - "CDISC_IA", - "CDISC_IB", - "CDISC_IC", - ], - } - ) - right_dataset = dataset_implementation.from_dict( - { - "USUBJID": [ - "CDISC01", - "CDISC01", - "CDISC01", - "CDISC01", - ], - "RDOMAIN": [ - "AE", - "AE", - "AE", - "AE", - ], - "QNAM": [ - "TEST", - "TEST", - "TEST", - "TEST_1", - ], - "IDVAR": [ - "AESEQ", - "AESEQ", - "AESEQ", - "AESEQ", - ], - "IDVARVAL": [ - "CDISC_IA", - "CDISC_IB", - "CDISC_IC", - "CDISC_IC", - ], - } - ) - - # call the tested function and check the results - merged_df = DataProcessor.merge_datasets_on_relationship_columns( - left_dataset=left_dataset, - left_dataset_match_keys=[], - right_dataset=right_dataset, - right_dataset_match_keys=[], - right_dataset_domain_name="SUPPAE", - column_with_names="IDVAR", - column_with_values="IDVARVAL", - ) - expected_df = dataset_implementation.from_dict( - { - "USUBJID": [ - "CDISC01", - "CDISC01", - "CDISC01", - "CDISC01", - ], - "DOMAIN": [ - "AE", - "AE", - "AE", - "AE", - ], - "AESEQ": [ - "CDISC_IA", - "CDISC_IB", - "CDISC_IC", - "CDISC_IC", - ], - "USUBJID.SUPPAE": [ - "CDISC01", - "CDISC01", - "CDISC01", - "CDISC01", - ], - "RDOMAIN": [ - "AE", - "AE", - "AE", - "AE", - ], - "QNAM": [ - "TEST", - "TEST", - "TEST", - "TEST_1", - ], - "IDVAR": [ - "AESEQ", - "AESEQ", - "AESEQ", - "AESEQ", - ], - "IDVARVAL": [ - "CDISC_IA", - "CDISC_IB", - "CDISC_IC", - "CDISC_IC", - ], - } - ) - assert merged_df.equals(expected_df) - - @pytest.mark.parametrize( "join_type, expected_df", [ @@ -465,3 +212,138 @@ def test_merge_datasets_on_join_type(join_type: JoinTypes, expected_df: PandasDa join_type=join_type, ) assert merged_df.equals(expected_df) + + +@pytest.mark.parametrize("dataset_implementation", [PandasDataset, DaskDataset]) +def test_merge_pivot_supp_dataset_single_idvar(dataset_implementation): + left_dataset = dataset_implementation.from_dict( + { + "USUBJID": ["CDISC01", "CDISC01", "CDISC01"], + "DOMAIN": ["AE", "AE", "AE"], + "AESEQ": [1, 2, 3], + "AETERM": ["Headache", "Nausea", "Fatigue"], + } + ) + right_dataset = dataset_implementation.from_dict( + { + "USUBJID": ["CDISC01", "CDISC01", "CDISC01"], + "RDOMAIN": ["AE", "AE", "AE"], + "IDVAR": ["AESEQ", "AESEQ", "AESEQ"], + "IDVARVAL": ["1", "2", "3"], + "QNAM": ["AESPID", "AESPID", "AESPID"], + "QVAL": ["SP001", "SP002", "SP003"], + } + ) + + merged_df = DataProcessor.merge_pivot_supp_dataset( + dataset_implementation=dataset_implementation, + left_dataset=left_dataset, + right_dataset=right_dataset, + ) + if isinstance(merged_df, DaskDataset): + result_data = merged_df.data.compute() + else: + result_data = merged_df.data + + # Verify pivot + assert "AESPID" in merged_df.columns, "AESPID column should be created from QNAM" + assert "QNAM" not in merged_df.columns, "QNAM should be dropped after pivot" + assert "QVAL" not in merged_df.columns, "QVAL should be dropped after pivot" + assert result_data[result_data["AESEQ"] == "1"]["AESPID"].values[0] == "SP001" + assert result_data[result_data["AESEQ"] == "2"]["AESPID"].values[0] == "SP002" + assert result_data[result_data["AESEQ"] == "3"]["AESPID"].values[0] == "SP003" + assert len(result_data) == 3 + + +@pytest.mark.parametrize("dataset_implementation", [PandasDataset, DaskDataset]) +def test_merge_pivot_supp_dataset_multiple_idvar(dataset_implementation): + left_dataset = dataset_implementation.from_dict( + { + "USUBJID": ["CDISC01", "CDISC01", "CDISC01"], + "DOMAIN": ["EC", "EC", "EC"], + "ECSEQ": [1, 2, 3], + "ECENDY": [5, 7, 10], + "ECTRT": ["Treatment A", "Treatment B", "Treatment C"], + } + ) + right_dataset = dataset_implementation.from_dict( + { + "USUBJID": ["CDISC01", "CDISC01", "CDISC01", "CDISC01"], + "RDOMAIN": ["EC", "EC", "EC", "EC"], + "IDVAR": ["ECSEQ", "ECSEQ", "ECENDY", "ECENDY"], + "IDVARVAL": ["1", "2", "7", "10"], + "QNAM": ["ECLOC", "ECLOC", "ECSITE", "ECSITE"], + "QVAL": ["Left Arm", "Right Arm", "Site A", "Site B"], + } + ) + + merged_df = DataProcessor.merge_pivot_supp_dataset( + dataset_implementation=dataset_implementation, + left_dataset=left_dataset, + right_dataset=right_dataset, + ) + if isinstance(merged_df, DaskDataset): + result_data = merged_df.data.compute() + else: + result_data = merged_df.data + + # Verify pivot + assert "ECLOC" in merged_df.columns, "ECLOC column should be created from QNAM" + assert "ECSITE" in merged_df.columns, "ECSITE column should be created from QNAM" + assert "QNAM" not in merged_df.columns, "QNAM should be dropped after pivot" + assert "QVAL" not in merged_df.columns, "QVAL should be dropped after pivot" + + row1 = result_data[result_data["ECSEQ"] == "1"].iloc[0] + assert row1["ECLOC"] == "Left Arm" + assert pd.isna(row1["ECSITE"]) + row2 = result_data[result_data["ECSEQ"] == "2"].iloc[0] + assert row2["ECLOC"] == "Right Arm" + assert row2["ECSITE"] == "Site A" + row3 = result_data[result_data["ECSEQ"] == "3"].iloc[0] + assert pd.isna(row3["ECLOC"]) + assert row3["ECSITE"] == "Site B" + assert len(result_data) == 3 + + +@pytest.mark.parametrize("dataset_implementation", [PandasDataset, DaskDataset]) +def test_merge_pivot_supp_dataset_blank_idvar(dataset_implementation): + left_dataset = dataset_implementation.from_dict( + { + "USUBJID": ["CDISC01", "CDISC02"], + "DOMAIN": ["DM", "DM"], + "AGE": [45, 52], + } + ) + right_dataset = dataset_implementation.from_dict( + { + "USUBJID": ["CDISC01", "CDISC02"], + "RDOMAIN": ["DM", "DM"], + "IDVAR": ["", ""], # Blank IDVAR + "IDVARVAL": ["", ""], + "QNAM": ["DMPOPFLAG", "DMPOPFLAG"], + "QVAL": ["Y", "N"], + } + ) + merged_df = DataProcessor.merge_pivot_supp_dataset( + dataset_implementation=dataset_implementation, + left_dataset=left_dataset, + right_dataset=right_dataset, + ) + if isinstance(merged_df, DaskDataset): + result_data = merged_df.data.compute() + else: + result_data = merged_df.data + + # Verify pivot + assert ( + "DMPOPFLAG" in merged_df.columns + ), "DMPOPFLAG column should be created from QNAM" + assert "QNAM" not in merged_df.columns, "QNAM should be dropped after pivot" + assert "QVAL" not in merged_df.columns, "QVAL should be dropped after pivot" + assert ( + result_data[result_data["USUBJID"] == "CDISC01"]["DMPOPFLAG"].values[0] == "Y" + ) + assert ( + result_data[result_data["USUBJID"] == "CDISC02"]["DMPOPFLAG"].values[0] == "N" + ) + assert len(result_data) == 2