diff --git a/cdisc_rules_engine/dataset_builders/dataset_metadata_define_dataset_builder.py b/cdisc_rules_engine/dataset_builders/dataset_metadata_define_dataset_builder.py index 177ed5c5c..d36838a5e 100644 --- a/cdisc_rules_engine/dataset_builders/dataset_metadata_define_dataset_builder.py +++ b/cdisc_rules_engine/dataset_builders/dataset_metadata_define_dataset_builder.py @@ -1,3 +1,4 @@ +from cdisc_rules_engine.models.dataset import DatasetInterface from cdisc_rules_engine.services import logger from cdisc_rules_engine.dataset_builders.base_dataset_builder import BaseDatasetBuilder import os @@ -55,9 +56,15 @@ def build(self): if self.dataset_metadata.full_path else None ) - matching_row = merged_cleaned[ + matching_row: DatasetInterface = merged_cleaned[ merged_cleaned["dataset_location"].str.lower() == dataset_filename ] + if matching_row.empty: + # when using DASK dataset_filename refers to temp parquet filename + matching_row: DatasetInterface = merged_cleaned[ + merged_cleaned["dataset_location"].str.lower() + == self.dataset_metadata.original_path.lower() + ] for column in merged.columns: merged[column] = matching_row[column].iloc[0] return merged @@ -93,20 +100,22 @@ def _get_dataset_dataframe(self): else: datasets = self.dataset_implementation() for dataset in self.datasets: + ds_metadata = None try: ds_metadata = self.data_service.get_dataset_metadata( - dataset.filename + dataset_name=dataset.filename + ) + ds_metadata.data["dataset_domain"] = getattr( + dataset, "domain", None ) - ds_metadata.data["dataset_domain"] = dataset.domain except Exception as e: logger.trace(e) logger.error(f"Error: {e}. Error message: {str(e)}") - datasets.data = ( - ds_metadata.data - if datasets.data.empty - else datasets.data.append(ds_metadata.data) - ) - + if ds_metadata: + if datasets.data.empty: + datasets.data = ds_metadata.data.copy() + else: + datasets.data = datasets.concat(ds_metadata).data if datasets.data.empty or len(datasets.data) == 0: dataset_df = self.dataset_implementation(columns=dataset_col_order) logger.info(f"No datasets metadata is provided for {__name__}.") diff --git a/cdisc_rules_engine/services/data_services/dummy_data_service.py b/cdisc_rules_engine/services/data_services/dummy_data_service.py index f22247f97..f163b0a14 100644 --- a/cdisc_rules_engine/services/data_services/dummy_data_service.py +++ b/cdisc_rules_engine/services/data_services/dummy_data_service.py @@ -4,7 +4,7 @@ import os import pandas as pd - +import tempfile from cdisc_rules_engine.dummy_models.dummy_dataset import DummyDataset from cdisc_rules_engine.exceptions.custom_exceptions import ( DatasetNotFoundError, @@ -156,7 +156,22 @@ def __get_dataset_metadata(self, dataset_name: str, **kwargs) -> dict: return metadata_to_return def to_parquet(self, file_path: str) -> str: - return "" + """ + Save the dataset with full_path == file_path to a parquet file. + Returns the number of rows and the path to the saved parquet file, or (0, "") if not found. + """ + for dataset in self.data: + if hasattr(dataset, "full_path") and dataset.full_path == file_path: + # Convert the DummyDataset's data (assumed to be a DataFrame or dict-like) to a pandas DataFrame + if hasattr(dataset, "data"): + df = pd.DataFrame(dataset.data) + else: + # fallback: try to convert the whole object to dict + df = pd.DataFrame([dataset.__dict__]) + temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".parquet") + df.to_parquet(temp_file.name) + return len(df.index), temp_file.name + return 0, "" def get_datasets(self) -> Iterable[SDTMDatasetMetadata]: return self.data diff --git a/tests/unit/test_rules_engine.py b/tests/unit/test_rules_engine.py index 8573651cb..c27468bfb 100644 --- a/tests/unit/test_rules_engine.py +++ b/tests/unit/test_rules_engine.py @@ -1278,33 +1278,29 @@ def test_validate_single_dataset_not_equal_to( PandasDataset( pd.DataFrame.from_dict( { - "dataset_name": [ - "AE", - ], - "dataset_label": [ - "Adverse Events", - ], - "dataset_location": [ - "te.xpt", - ], + "dataset_name": ["AE"], + "dataset_label": ["Adverse"], + "dataset_location": ["ae.xpt"], } ) ), [ { - "executionStatus": "skipped", + "executionStatus": "success", "dataset": "ae.xpt", "domain": "AE", - "variables": [], - "message": "rule evaluation error - evaluation dataset failed to build", + "variables": ["dataset_label", "dataset_location", "dataset_name"], + "message": "Dataset metadata does not correspond to Define XML", "errors": [ { + "value": { + "dataset_name": "AE", + "dataset_location": "ae.xpt", + "dataset_label": "Adverse", + }, "dataset": "ae.xpt", - "error": "Error occurred during dataset building", - "message": "Failed to build dataset for rule validation. Builder: " - "DatasetMetadataDefineDatasetBuilder, Dataset: AE, " - "Error: single positional indexer is out-of-bounds", - } + "row": 1, + }, ], } ], @@ -1320,15 +1316,9 @@ def test_validate_single_dataset_not_equal_to( PandasDataset( pd.DataFrame.from_dict( { - "dataset_name": [ - "AE", - ], - "dataset_label": [ - "Adverse Events", - ], - "dataset_location": [ - "ae.xpt", - ], + "dataset_name": ["AE"], + "dataset_label": ["Adverse Events"], + "dataset_location": ["ae.xpt"], } ) ), @@ -1372,6 +1362,7 @@ def test_validate_dataset_metadata_against_define_xml( first_record={"DOMAIN": "AE"}, full_path="CDISC01/test/ae.xpt", filename="ae.xpt", + original_path="ae.xpt", ) validation_result: List[dict] = RulesEngine( standard="sdtmig"