diff --git a/cdisc_rules_engine/check_operators/dataframe_operators.py b/cdisc_rules_engine/check_operators/dataframe_operators.py index 6de068ed5..09ff499fb 100644 --- a/cdisc_rules_engine/check_operators/dataframe_operators.py +++ b/cdisc_rules_engine/check_operators/dataframe_operators.py @@ -1200,6 +1200,7 @@ def is_inconsistent_across_dataset(self, other_value): def is_unique_set(self, other_value): target = self.replace_prefix(other_value.get("target")) comparator = other_value.get("comparator") + regex_pattern = other_value.get("regex") values = [target, comparator] target_data = flatten_list(self.value, values) target_names = [] @@ -1209,6 +1210,13 @@ def is_unique_set(self, other_value): target_names.append(target_name) target_names = list(set(target_names)) df_group = self.value[target_names].copy() + if regex_pattern: + for col in df_group.columns: + df_group[col] = df_group[col].apply( + lambda x: ( + apply_regex(regex_pattern, x) if isinstance(x, str) and x else x + ) + ) df_group = df_group.fillna("_NaN_") group_sizes = df_group.groupby(target_names).size() counts = df_group.apply(tuple, axis=1).map(group_sizes) diff --git a/cdisc_rules_engine/operations/base_operation.py b/cdisc_rules_engine/operations/base_operation.py index 3ac0e1205..baf3ea20e 100644 --- a/cdisc_rules_engine/operations/base_operation.py +++ b/cdisc_rules_engine/operations/base_operation.py @@ -191,52 +191,36 @@ def _rename_grouping_columns(self, data): ) def _get_grouping_columns(self) -> List[str]: - if any(item.startswith("$") for item in self.params.grouping): - return self._expand_operation_results_in_grouping(self.params.grouping) + expanded = self._expand_operation_results_in_grouping(self.params.grouping) + if not self.params.grouping_aliases: + return expanded else: - return ( - self.params.grouping - if not self.params.grouping_aliases - else [ - ( - self.params.grouping_aliases[i] - if 0 <= i < len(self.params.grouping_aliases) - else v - ) - for i, v in enumerate(self.params.grouping) - ] - ) + return [ + ( + self.params.grouping_aliases[i] + if 0 <= i < len(self.params.grouping_aliases) + else v + ) + for i, v in enumerate(expanded) + ] def _expand_operation_results_in_grouping(self, grouping_list): expanded = [] for item in grouping_list: - if item.startswith("$") and item in self.evaluation_dataset.columns: + if item in self.evaluation_dataset.columns: operation_col = self.evaluation_dataset[item] first_val = operation_col.iloc[0] - if operation_col.astype(str).nunique() == 1: - if isinstance(first_val, (list, tuple)): - expanded.extend(first_val) - else: - expanded.append(item) + if ( + isinstance(first_val, (list, tuple)) + and operation_col.astype(str).nunique() == 1 + ): + expanded.extend(first_val) else: - expanded.extend(self._collect_values_from_column(operation_col)) + expanded.append(item) else: expanded.append(item) return list(dict.fromkeys(expanded)) - def _collect_values_from_column(self, operation_col): - seen = [] - for val in operation_col: - if val is not None: - if isinstance(val, (list, tuple)): - for v in val: - if v not in seen: - seen.append(v) - else: - if val not in seen: - seen.append(val) - return seen - def _get_variables_metadata_from_standard(self) -> List[dict]: # TODO: Update to handle other standard types: adam, cdash, etc. target_metadata = None diff --git a/cdisc_rules_engine/operations/record_count.py b/cdisc_rules_engine/operations/record_count.py index 6a6dfc2d8..01ae600c1 100644 --- a/cdisc_rules_engine/operations/record_count.py +++ b/cdisc_rules_engine/operations/record_count.py @@ -56,8 +56,8 @@ def _build_effective_grouping(self) -> tuple[list, dict]: ) effective_grouping = [] for col in grouping_cols: - if col in self.params.dataframe.columns: - sample_val = self.params.dataframe[col].iloc[0] + if col in self.evaluation_dataset.data.columns: + sample_val = self.evaluation_dataset[col].iloc[0] if isinstance(sample_val, (list, tuple)): # This is an operation result - expand the list effective_grouping.extend(sample_val) diff --git a/cdisc_rules_engine/operations/variable_is_null.py b/cdisc_rules_engine/operations/variable_is_null.py index a60cf6702..58fa93197 100644 --- a/cdisc_rules_engine/operations/variable_is_null.py +++ b/cdisc_rules_engine/operations/variable_is_null.py @@ -14,7 +14,7 @@ def _execute_operation(self): ] return self.data_service.dataset_implementation().convert_to_series(result) else: - target_variable = self.params.target.replace("--", self.params.domain, 1) + target_variable = self.params.target return self._is_target_variable_null(dataframe, target_variable) def _is_target_variable_null(self, dataframe, target_variable: str) -> bool: diff --git a/cdisc_rules_engine/utilities/rule_processor.py b/cdisc_rules_engine/utilities/rule_processor.py index e2b135b35..cc21d7ad8 100644 --- a/cdisc_rules_engine/utilities/rule_processor.py +++ b/cdisc_rules_engine/utilities/rule_processor.py @@ -12,6 +12,7 @@ LibraryMetadataContainer, ) +import copy import os from cdisc_rules_engine.constants.classes import ( FINDINGS_ABOUT, @@ -597,6 +598,50 @@ def add_comparator_to_rule_conditions( f"comparator={comparator}, conditions={rule['conditions']}" ) + def _preprocess_operation_params( + self, operation_params: OperationParams, domain_details: dict = None + ) -> OperationParams: + # uses shallow copy to not overwrite for subsequent + # operations and avoids costly deepcopy of dataframe + params_copy = copy.copy(operation_params) + current_domain = params_copy.domain + if domain_details.is_supp: + current_domain = domain_details.rdomain + for param_name in vars(params_copy): + if param_name in ("datasets", "dataframe"): + continue + param_value = getattr(params_copy, param_name) + updated_value = self._replace_wildcards_in_value( + param_value, current_domain + ) + if updated_value is not param_value: + updated_value = copy.deepcopy(updated_value) + setattr(params_copy, param_name, updated_value) + return params_copy + + def _replace_wildcards_in_value(self, value, domain: str): + if value is None: + return value + if isinstance(value, str): + return value.replace("--", domain) + elif isinstance(value, list): + return [self._replace_wildcards_in_value(item, domain) for item in value] + elif isinstance(value, set): + return {self._replace_wildcards_in_value(item, domain) for item in value} + elif isinstance(value, dict): + return { + self._replace_wildcards_in_value( + k, domain + ): self._replace_wildcards_in_value(v, domain) + for k, v in value.items() + } + elif isinstance(value, tuple): + return tuple( + self._replace_wildcards_in_value(item, domain) for item in value + ) + else: + return value + @staticmethod def duplicate_conditions_for_all_targets( conditions: ConditionInterface, targets: List[str] diff --git a/resources/schema/Operator.md b/resources/schema/Operator.md index ae7fdcf16..16e0f5a89 100644 --- a/resources/schema/Operator.md +++ b/resources/schema/Operator.md @@ -1,5 +1,7 @@ # Check Operator +NOTE: Complementary operators have access to the same paremeter arguments unless otherwise stated. + ## Relational Basic value comparisons and presence checks for evaluating equality, inequality, ranges, and whether values exist or are empty. @@ -854,17 +856,34 @@ Relationship Integrity Check > `name` can be a variable containing a list of columns and `value` does not need to be present +> The `regex` parameter allows you to extract portions of values using a regex pattern before checking uniqueness. + +> Compare date only (YYYY-MM-DD) for uniqueness + ```yaml -Rule Type: Dataset Contents Check against Define XML -Check: - all: - - name: define_dataset_key_sequence # contains list of dataset key columns - operator: is_unique_set +- name: "--REPNUM" + operator: is_not_unique_set + value: + - "USUBJID" + - "--TESTCD" + - "$TIMING_VARIABLES" + regex: '^\d{4}-\d{2}-\d{2}' +``` + +> Compare by first N characters of a string + +```yaml +- name: "ITEM_ID" + operator: is_not_unique_set + value: + - "USUBJID" + - "CATEGORY" + regex: "^.{2}" ``` ### is_not_unique_set -Complement of `is_unique_set` +Complement of `is_unique_set`. > --SEQ is not unique within DOMAIN, USUBJID, and --TESTCD diff --git a/tests/unit/test_check_operators/test_value_set_checks.py b/tests/unit/test_check_operators/test_value_set_checks.py index c3901629f..459b78a76 100644 --- a/tests/unit/test_check_operators/test_value_set_checks.py +++ b/tests/unit/test_check_operators/test_value_set_checks.py @@ -58,6 +58,42 @@ def test_is_not_unique_set(target, comparator, dataset_type, expected_result): assert result.equals(df.convert_to_series(expected_result)) +@pytest.mark.parametrize( + "target, comparator, regex, dataset_type, expected_result", + [ + ( + "ARM", + "DTC", + r"^\d{4}-\d{2}-\d{2}", + PandasDataset, + [False, False, False, False], + ), + ("ARM", "TAE", None, PandasDataset, [False, False, True, True]), + ], +) +def test_is_unique_set_with_regex( + target, comparator, regex, dataset_type, expected_result +): + data = { + "ARM": ["PLACEBO", "PLACEBO", "ACTIVE", "ACTIVE"], + "TAE": [1, 1, 1, 2], + "DTC": [ + "2024-01-15T10:30:00", + "2024-01-15T14:45:00", + "2024-01-16T10:30:00", + "2024-01-16T14:45:00", + ], + } + df = dataset_type.from_dict(data) + params = {"target": target, "comparator": comparator} + if regex is not None: + params["regex"] = regex + result = DataframeType( + {"value": df, "column_prefix_map": {"--": "AR"}} + ).is_unique_set(params) + assert result.equals(df.convert_to_series(expected_result)) + + @pytest.mark.parametrize( "target, comparator, dataset_type, expected_result", [ diff --git a/tests/unit/test_operations/test_variable_is_null.py b/tests/unit/test_operations/test_variable_is_null.py index e662a974b..286e57339 100644 --- a/tests/unit/test_operations/test_variable_is_null.py +++ b/tests/unit/test_operations/test_variable_is_null.py @@ -42,7 +42,7 @@ def test_variable_is_null( config = ConfigService() cache = CacheServiceFactory(config).get_cache_service() operation_params.dataframe = data - operation_params.target = "--VAR" + operation_params.target = "AEVAR" operation_params.domain = "AE" mock_data_service.get_dataset.return_value = data mock_data_service.dataset_implementation = data.__class__ diff --git a/tests/unit/test_utilities/test_rule_processor.py b/tests/unit/test_utilities/test_rule_processor.py index 0ac4cb582..3b8cc3466 100644 --- a/tests/unit/test_utilities/test_rule_processor.py +++ b/tests/unit/test_utilities/test_rule_processor.py @@ -23,6 +23,7 @@ INTERVENTIONS, ) from cdisc_rules_engine.models.dataset import PandasDataset, DaskDataset +from cdisc_rules_engine.models.operation_params import OperationParams @pytest.mark.parametrize( @@ -479,29 +480,34 @@ def test_perform_rule_operation(mock_data_service, dataset_implementation): df = dataset_implementation.from_dict( {"AESTDY": [11, 12, 40, 59, 59], "DOMAIN": ["AE", "AE", "AE", "AE", "AE"]} ) - processor = RuleProcessor(mock_data_service, InMemoryCacheService()) - with patch( - "cdisc_rules_engine.services.data_services.LocalDataService.get_dataset", - return_value=df, - ): - result = processor.perform_rule_operations( - rule, - df, - "AE", - [{"domain": "AE", "filename": "ae.xpt"}], - "test/", - standard="sdtmig", - standard_version="3-1-2", - standard_substandard=None, + datasets = [ + SDTMDatasetMetadata( + filename="ae.xpt", + full_path="test/ae.xpt", + name="AE", + label="Adverse Events", ) - assert "$avg_aestdy" in result - assert "$unique_aestdy" in result - assert "$max_aestdy" in result - assert "$min_aestdy" in result - assert result["$max_aestdy"][0] == df["AESTDY"].max() - assert result["$min_aestdy"][0] == df["AESTDY"].min() - assert result["$avg_aestdy"][0] == df["AESTDY"].mean() - assert result["$unique_aestdy"].equals(pd.Series([{11, 12, 40, 59}] * len(df))) + ] + mock_data_service.get_dataset.return_value = df + processor = RuleProcessor(mock_data_service, InMemoryCacheService()) + result = processor.perform_rule_operations( + rule, + df, + "AE", + datasets, + "test/", + standard="sdtmig", + standard_version="3-1-2", + standard_substandard=None, + ) + assert "$avg_aestdy" in result + assert "$unique_aestdy" in result + assert "$max_aestdy" in result + assert "$min_aestdy" in result + assert result["$max_aestdy"][0] == df["AESTDY"].max() + assert result["$min_aestdy"][0] == df["AESTDY"].min() + assert result["$avg_aestdy"][0] == df["AESTDY"].mean() + assert result["$unique_aestdy"].equals(pd.Series([{11, 12, 40, 59}] * len(df))) @pytest.mark.parametrize("dataset_implementation", [PandasDataset, DaskDataset]) @@ -569,57 +575,64 @@ def test_perform_rule_operation_with_grouping( "DOMAIN": ["AE", "AE", "AE", "AE"], } ) - processor = RuleProcessor(mock_data_service, InMemoryCacheService()) - with patch( - "cdisc_rules_engine.services.data_services.LocalDataService.get_dataset", - return_value=df, - ): - data = processor.perform_rule_operations( - rule, - df, - "AE", - [{"domain": "AE", "filename": "ae.xpt"}], - "test/", - standard="sdtmig", - standard_version="3-1-2", - standard_substandard=None, + + datasets = [ + SDTMDatasetMetadata( + filename="ae.xpt", + full_path="test/ae.xpt", + name="AE", + label="Adverse Events", ) - assert "$avg_aestdy" in data - assert data["$avg_aestdy"].values.tolist() == [25, 35, 25, 35] - assert "$max_aestdy" in data - assert data["$max_aestdy"].values.tolist() == [40, 59, 40, 59] - assert "$min_aestdy" in data - assert data["$min_aestdy"].values.tolist() == [10, 11, 10, 11] - assert data[["USUBJID", "$unique_aestdy"]].equals( - pd.DataFrame.from_dict( - { - "USUBJID": [ - 1, - 200, - 1, - 200, - ], - "$unique_aestdy": [ - { - 10, - 40, - }, - { - 11, - 59, - }, - { - 10, - 40, - }, - { - 11, - 59, - }, - ], - } - ) + ] + + mock_data_service.get_dataset.return_value = df + processor = RuleProcessor(mock_data_service, InMemoryCacheService()) + data = processor.perform_rule_operations( + rule, + df, + "AE", + datasets, + "test/", + standard="sdtmig", + standard_version="3-1-2", + standard_substandard=None, + ) + assert "$avg_aestdy" in data + assert data["$avg_aestdy"].values.tolist() == [25, 35, 25, 35] + assert "$max_aestdy" in data + assert data["$max_aestdy"].values.tolist() == [40, 59, 40, 59] + assert "$min_aestdy" in data + assert data["$min_aestdy"].values.tolist() == [10, 11, 10, 11] + assert data[["USUBJID", "$unique_aestdy"]].equals( + pd.DataFrame.from_dict( + { + "USUBJID": [ + 1, + 200, + 1, + 200, + ], + "$unique_aestdy": [ + { + 10, + 40, + }, + { + 11, + 59, + }, + { + 10, + 40, + }, + { + 11, + 59, + }, + ], + } ) + ) @pytest.mark.parametrize("dataset_implementation", [PandasDataset, DaskDataset]) @@ -680,27 +693,34 @@ def test_perform_rule_operation_with_multi_key_grouping( "STUDYID": ["A", "A", "A", "A", "B", "B"], } ) - processor = RuleProcessor(mock_data_service, InMemoryCacheService()) - with patch( - "cdisc_rules_engine.services.data_services.LocalDataService.get_dataset", - return_value=df, - ): - data = processor.perform_rule_operations( - rule, - df, - "AE", - [{"domain": "AE", "filename": "ae.xpt"}], - "test/", - standard="sdtmig", - standard_version="3-1-2", - standard_substandard=None, + + datasets = [ + SDTMDatasetMetadata( + filename="ae.xpt", + full_path="test/ae.xpt", + name="AE", + label="Adverse Events", ) - assert "$avg_aestdy" in data - assert data["$avg_aestdy"].values.tolist() == [25, 35, 25, 35, 30, 112] - assert "$max_aestdy" in data - assert data["$max_aestdy"].values.tolist() == [40, 59, 40, 59, 30, 112] - assert "$min_aestdy" in data - assert data["$min_aestdy"].values.tolist() == [10, 11, 10, 11, 30, 112] + ] + + mock_data_service.get_dataset.return_value = df + processor = RuleProcessor(mock_data_service, InMemoryCacheService()) + data = processor.perform_rule_operations( + rule, + df, + "AE", + datasets, + "test/", + standard="sdtmig", + standard_version="3-1-2", + standard_substandard=None, + ) + assert "$avg_aestdy" in data + assert data["$avg_aestdy"].values.tolist() == [25, 35, 25, 35, 30, 112] + assert "$max_aestdy" in data + assert data["$max_aestdy"].values.tolist() == [40, 59, 40, 59, 30, 112] + assert "$min_aestdy" in data + assert data["$min_aestdy"].values.tolist() == [10, 11, 10, 11, 30, 112] @pytest.mark.parametrize("dataset_implementation", [PandasDataset, DaskDataset]) @@ -734,12 +754,20 @@ def test_perform_rule_operation_with_null_operations( df = dataset_implementation.from_dict( {"AESTDY": [11, 12, 40, 59], "USUBJID": [1, 200, 1, 200]} ) + datasets = [ + SDTMDatasetMetadata( + filename="ae.xpt", + full_path="test/ae.xpt", + name="AE", + label="Adverse Events", + ) + ] processor = RuleProcessor(mock_data_service, InMemoryCacheService()) new_data = processor.perform_rule_operations( rule, df, "AE", - [{"domain": "AE", "filename": "ae.xpt"}], + datasets, "test/", standard="sdtmig", standard_version="3-1-2", @@ -748,6 +776,68 @@ def test_perform_rule_operation_with_null_operations( assert df.equals(new_data) +def test_preprocess_operation_params_wildcard_replacement(mock_data_service): + processor = RuleProcessor(mock_data_service, InMemoryCacheService()) + df = PandasDataset.from_dict({"AESEQ": [1, 2, 3]}) + operation_params = OperationParams( + core_id="test_id", + operation_id="test_op", + operation_name="test_operator", + dataframe=df, + target="--SEQ", + original_target="--SEQ", + domain="AE", + dataset_path="test/ae.xpt", + directory_path="test/", + datasets=[], + standard="sdtmig", + standard_version="3-4", + grouping=["--SEQ", "--DTC", "USUBJID"], + filter={"--STAT": "COMPLETED"}, + ) + domain_details = SDTMDatasetMetadata( + filename="ae.xpt", full_path="test/ae.xpt", name="AE", label="Adverse Events" + ) + result = processor._preprocess_operation_params(operation_params, domain_details) + assert result.target == "AESEQ" + assert result.original_target == "AESEQ" + assert result.grouping == ["AESEQ", "AEDTC", "USUBJID"] + assert result.filter == {"AESTAT": "COMPLETED"} + # Check that original params and dataframe are not modified + assert operation_params.target == "--SEQ" + assert operation_params.grouping == ["--SEQ", "--DTC", "USUBJID"] + assert result.dataframe is operation_params.dataframe + + +def test_preprocess_operation_params_supp_domain_uses_rdomain(mock_data_service): + processor = RuleProcessor(mock_data_service, InMemoryCacheService()) + df = PandasDataset.from_dict({"AESEQ": [1, 2, 3]}) + operation_params = OperationParams( + core_id="test_id", + operation_id="test_op", + operation_name="test_operator", + dataframe=df, + target="--SEQ", + original_target="--SEQ", + domain=None, + dataset_path="test/suppae.xpt", + directory_path="test/", + datasets=[], + standard="sdtmig", + standard_version="3-4", + ) + domain_details = SDTMDatasetMetadata( + filename="suppae.xpt", + full_path="test/suppae.xpt", + name="SUPPAE", + label="Supplemental AE", + first_record={"RDOMAIN": "AE"}, + ) + result = processor._preprocess_operation_params(operation_params, domain_details) + assert result.target == "AESEQ" + assert result.original_target == "AESEQ" + + @patch( "cdisc_rules_engine.services.data_services.LocalDataService.get_dataset_metadata" )