Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from cdisc_rules_engine.models.dataset import DatasetInterface
from cdisc_rules_engine.services import logger
from cdisc_rules_engine.dataset_builders.base_dataset_builder import BaseDatasetBuilder
import os
Expand Down Expand Up @@ -55,9 +56,15 @@ def build(self):
if self.dataset_metadata.full_path
else None
)
matching_row = merged_cleaned[
matching_row: DatasetInterface = merged_cleaned[
merged_cleaned["dataset_location"].str.lower() == dataset_filename
]
if matching_row.empty:
# when using DASK dataset_filename refers to temp parquet filename
matching_row: DatasetInterface = merged_cleaned[
merged_cleaned["dataset_location"].str.lower()
== self.dataset_metadata.original_path.lower()
]
for column in merged.columns:
merged[column] = matching_row[column].iloc[0]
return merged
Expand Down Expand Up @@ -93,20 +100,22 @@ def _get_dataset_dataframe(self):
else:
datasets = self.dataset_implementation()
for dataset in self.datasets:
ds_metadata = None
try:
ds_metadata = self.data_service.get_dataset_metadata(
dataset.filename
dataset_name=dataset.filename
)
ds_metadata.data["dataset_domain"] = getattr(
dataset, "domain", None
)
ds_metadata.data["dataset_domain"] = dataset.domain
except Exception as e:
logger.trace(e)
logger.error(f"Error: {e}. Error message: {str(e)}")
datasets.data = (
ds_metadata.data
if datasets.data.empty
else datasets.data.append(ds_metadata.data)
)

if ds_metadata:
if datasets.data.empty:
datasets.data = ds_metadata.data.copy()
else:
datasets.data = datasets.concat(ds_metadata).data
if datasets.data.empty or len(datasets.data) == 0:
dataset_df = self.dataset_implementation(columns=dataset_col_order)
logger.info(f"No datasets metadata is provided for {__name__}.")
Expand Down
19 changes: 17 additions & 2 deletions cdisc_rules_engine/services/data_services/dummy_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import os
import pandas as pd

import tempfile
from cdisc_rules_engine.dummy_models.dummy_dataset import DummyDataset
from cdisc_rules_engine.exceptions.custom_exceptions import (
DatasetNotFoundError,
Expand Down Expand Up @@ -156,7 +156,22 @@ def __get_dataset_metadata(self, dataset_name: str, **kwargs) -> dict:
return metadata_to_return

def to_parquet(self, file_path: str) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexfurmenkov why are you updating the to_parquet method for dummy_data_service? Dummy is for the TestRule Azure Function hosted endpoint for CORE and will never be in parquet format. I was able to run CG0320 just fine without this codeblock?

Copy link
Collaborator Author

@alexfurmenkov alexfurmenkov Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise it just doesn't work if ran with DASK

return ""
"""
Save the dataset with full_path == file_path to a parquet file.
Returns the number of rows and the path to the saved parquet file, or (0, "") if not found.
"""
for dataset in self.data:
if hasattr(dataset, "full_path") and dataset.full_path == file_path:
# Convert the DummyDataset's data (assumed to be a DataFrame or dict-like) to a pandas DataFrame
if hasattr(dataset, "data"):
df = pd.DataFrame(dataset.data)
else:
# fallback: try to convert the whole object to dict
df = pd.DataFrame([dataset.__dict__])
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".parquet")
df.to_parquet(temp_file.name)
return len(df.index), temp_file.name
return 0, ""

def get_datasets(self) -> Iterable[SDTMDatasetMetadata]:
return self.data
Expand Down
43 changes: 17 additions & 26 deletions tests/unit/test_rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1278,33 +1278,29 @@ def test_validate_single_dataset_not_equal_to(
PandasDataset(
pd.DataFrame.from_dict(
{
"dataset_name": [
"AE",
],
"dataset_label": [
"Adverse Events",
],
"dataset_location": [
"te.xpt",
],
"dataset_name": ["AE"],
"dataset_label": ["Adverse"],
"dataset_location": ["ae.xpt"],
}
)
),
[
{
"executionStatus": "skipped",
"executionStatus": "success",
"dataset": "ae.xpt",
"domain": "AE",
"variables": [],
"message": "rule evaluation error - evaluation dataset failed to build",
"variables": ["dataset_label", "dataset_location", "dataset_name"],
"message": "Dataset metadata does not correspond to Define XML",
"errors": [
{
"value": {
"dataset_name": "AE",
"dataset_location": "ae.xpt",
"dataset_label": "Adverse",
},
"dataset": "ae.xpt",
"error": "Error occurred during dataset building",
"message": "Failed to build dataset for rule validation. Builder: "
"DatasetMetadataDefineDatasetBuilder, Dataset: AE, "
"Error: single positional indexer is out-of-bounds",
}
"row": 1,
},
],
}
],
Expand All @@ -1320,15 +1316,9 @@ def test_validate_single_dataset_not_equal_to(
PandasDataset(
pd.DataFrame.from_dict(
{
"dataset_name": [
"AE",
],
"dataset_label": [
"Adverse Events",
],
"dataset_location": [
"ae.xpt",
],
"dataset_name": ["AE"],
"dataset_label": ["Adverse Events"],
"dataset_location": ["ae.xpt"],
}
)
),
Expand Down Expand Up @@ -1372,6 +1362,7 @@ def test_validate_dataset_metadata_against_define_xml(
first_record={"DOMAIN": "AE"},
full_path="CDISC01/test/ae.xpt",
filename="ae.xpt",
original_path="ae.xpt",
)
validation_result: List[dict] = RulesEngine(
standard="sdtmig"
Expand Down
Loading