Skip to content
Merged

Cg0288 #1419

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cdisc_rules_engine/check_operators/dataframe_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,12 @@ def is_contained_by(self, other_value):
elif self.is_column_of_iterables(comparison_data):
results = vectorized_is_in(target_data, comparison_data)
else:
if isinstance(comparison_data, pd.Series):
comparison_data = comparison_data.apply(
lambda x: list(x) if isinstance(x, set) else x
)
elif isinstance(comparison_data, set):
comparison_data = list(comparison_data)
results = target_data.isin(comparison_data)
return self.value.convert_to_series(results)

Expand Down
1 change: 0 additions & 1 deletion cdisc_rules_engine/models/operation_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class OperationParams:
codelists: list = None
ct_attribute: str = None
ct_package_types: List[str] = None
ct_package: list = None
ct_packages: list = None
ct_version: str = None
ct_package_type: str = None
Expand Down
182 changes: 151 additions & 31 deletions cdisc_rules_engine/operations/get_codelist_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,28 @@
from cdisc_rules_engine.models.dataset import DaskDataset


def _get_ct_package_dask(
row, ct_target, ct_version, ct_packages, standard, substandard
):
if pd.isna(row[ct_version]) or str(row[ct_version]).strip() == "":
return ""
target_val = str(row[ct_target]).strip() if pd.notna(row[ct_target]) else ""
if target_val in ("CDISC", "CDISC CT"):
std = standard.lower()
if "tig" in std:
std = substandard.lower()
if "adam" in std:
prefix = "adamct"
elif "send" in std:
prefix = "sendct"
else:
prefix = "sdtmct"
pkg = f"{prefix}-{row[ct_version]}"
else:
pkg = f"{target_val}-{row[ct_version]}"
return pkg if pkg in ct_packages else ""


class CodeListAttributes(BaseOperation):
"""
A class for fetching codelist attributes for a trial summary domain.
Expand Down Expand Up @@ -42,23 +64,59 @@ def _get_codelist_attributes(self):
ct_name = "CT_PACKAGE" # a column for controlled term package names
# Get controlled term attribute column name specified in rule
ct_attribute = self.params.ct_attribute

ct_target = self.params.target
ct_version = self.params.ct_version
ct_packages = self.params.ct_packages
df = self.params.dataframe
# 2.0 build codelist from cache
# -------------------------------------------------------------------
ct_cache = self._get_ct_from_library_metadata(
ct_key=ct_name, ct_val=ct_attribute
)

# 3.0 get dataset records
# -------------------------------------------------------------------
ct_data = self._get_ct_from_dataset(ct_key=ct_name, ct_val=ct_attribute)
def get_ct_package(row):
if pd.isna(row[ct_version]) or str(row[ct_version]).strip() == "":
return ""
target_val = str(row[ct_target]).strip() if pd.notna(row[ct_target]) else ""
# Handle CDISC CT packages
if target_val in ("CDISC", "CDISC CT"):
standard = self.params.standard.lower()
if "tig" in standard:
# use substandard for relevant TIG CT
standard = self.params.standard_substandard.lower()
if "adam" in standard:
prefix = "adamct"
elif "send" in standard:
prefix = "sendct"
else:
prefix = "sdtmct"
pkg = f"{prefix}-{row[ct_version]}"
else:
# Handle external codelists
pkg = f"{target_val}-{row[ct_version]}"
return pkg if pkg in ct_packages else ""

# 4.0 merge the two datasets by CC
# -------------------------------------------------------------------
cc_key = ct_data[ct_name]
ct_list = ct_cache[(ct_cache[ct_name].isin(cc_key))]
ds_len = self.params.dataframe.len()
result = pd.Series([ct_list[ct_attribute].values[0] for _ in range(ds_len)])
if isinstance(df, DaskDataset):
row_packages = df.data.apply(
_get_ct_package_dask,
axis=1,
meta=(None, "object"),
args=(
ct_target,
ct_version,
ct_packages,
self.params.standard,
self.params.standard_substandard,
),
)
else:
row_packages = df.data.apply(get_ct_package, axis=1)
package_to_codelist = {}
for _, row in ct_cache.iterrows():
package_to_codelist[row[ct_name]] = row[ct_attribute]
result = row_packages.apply(
lambda pkg: package_to_codelist.get(pkg, set()) if pkg else set()
)
return result

def _get_ct_from_library_metadata(self, ct_key: str, ct_val: str):
Expand All @@ -75,14 +133,16 @@ def _get_ct_from_library_metadata(self, ct_key: str, ct_val: str):
retrieved from the cache.
"""
ct_packages = self.params.ct_packages
ct_term_maps = (
[]
if ct_packages is None
else [
ct_term_maps = []
for package in ct_packages:
parts = package.rsplit("-", 3)
if len(parts) >= 4:
ct_package_type = parts[0]
version = "-".join(parts[1:])
self.library_metadata._load_ct_package_data(ct_package_type, version)
ct_term_maps.append(
self.library_metadata.get_ct_package_metadata(package) or {}
for package in ct_packages
]
)
)

# convert codelist to dataframe
ct_result = {ct_key: [], ct_val: []}
Expand Down Expand Up @@ -138,21 +198,81 @@ def _get_ct_from_dataset(self, ct_key: str, ct_val: str):
return result

def _add_codelist(self, ct_key, ct_val, ct_term_maps, ct_result):
"""
Adds codelist information to the result dictionary.

Args:
ct_key (str): The key for identifying the codelist.
ct_val (str): The value associated with the codelist.
ct_term_maps (list[dict]): A list of dictionaries containing
codelist information.
ct_result (dict): The dictionary to store the codelist information.

Returns:
dict: The updated ct_result dictionary.
"""
for item in ct_term_maps:
ct_result[ct_key].append(item.get("package"))
codes = set(code for code in item.keys() if code != "package")
codes = self._extract_codes_by_attribute(item, ct_val)
ct_result[ct_val].append(codes)
return ct_result

def _extract_codes_by_attribute(
self, ct_package_data: dict, ct_attribute: str
) -> set:
submission_lookup = ct_package_data.get("submission_lookup", {})

if ct_attribute == "Term CCODE":
return self._extract_term_codes(submission_lookup)
elif ct_attribute == "Codelist CCODE":
return self._extract_codelist_codes(submission_lookup)
elif ct_attribute in ("Term Value", "Term Submission Value"):
return self._extract_term_values(submission_lookup)
elif ct_attribute == "Codelist Value":
return self._extract_codelist_values(submission_lookup)
elif ct_attribute == "Term Preferred Term":
return self._extract_preferred_terms(submission_lookup, ct_package_data)
else:
raise ValueError(f"Unsupported ct_attribute: {ct_attribute}")

def _extract_codelist_values(self, submission_lookup: dict) -> set:
codes = set()
for term_name, term_data in submission_lookup.items():
term_code = term_data.get("term")
if term_code and term_code == "N/A":
codes.add(term_name)
return codes

def _extract_term_codes(self, submission_lookup: dict) -> set:
codes = set()
for term_data in submission_lookup.values():
term_code = term_data.get("term")
if term_code and term_code != "N/A":
codes.add(term_code)
return codes

def _extract_codelist_codes(self, submission_lookup: dict) -> set:
codes = set()
for term_data in submission_lookup.values():
codelist_code = term_data.get("codelist")
if codelist_code:
codes.add(codelist_code)
return codes

def _extract_term_values(self, submission_lookup: dict) -> set:
codes = set()
for term_name, term_data in submission_lookup.items():
term_code = term_data.get("term")
if term_code and term_code != "N/A":
codes.add(term_name)
return codes

def _extract_preferred_terms(
self, submission_lookup: dict, ct_package_data: dict
) -> set:
codes = set()
for term_name, term_data in submission_lookup.items():
if not isinstance(term_data, dict):
continue
term_code = term_data.get("term")
if not term_code or term_code == "N/A":
continue
codelist_id = term_data.get("codelist")
if not codelist_id or codelist_id not in ct_package_data:
continue
codelist_info = ct_package_data[codelist_id]
terms = codelist_info.get("terms", [])
for term in terms:
if term.get("conceptId") == term_code:
pref_term = term.get("preferredTerm")
if pref_term:
codes.add(pref_term)
break
return codes
1 change: 0 additions & 1 deletion cdisc_rules_engine/rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ def __init__(
)
self.data_processor = DataProcessor(self.data_service, self.cache)
self.ct_packages = kwargs.get("ct_packages", [])
self.ct_package = kwargs.get("ct_package")
self.external_dictionaries = external_dictionaries
self.define_xml_path: str = kwargs.get("define_xml_path")
self.validate_xml: bool = kwargs.get("validate_xml")
Expand Down
7 changes: 3 additions & 4 deletions cdisc_rules_engine/utilities/rule_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,17 +390,16 @@ def perform_rule_operations(
standard_version=standard_version,
standard_substandard=standard_substandard,
external_dictionaries=external_dictionaries,
ct_version=operation.get("version"),
ct_version=operation.get("ct_version"),
ct_package_type=RuleProcessor._ct_package_type_api_name(
operation.get("ct_package_type")
),
ct_attribute=operation.get("attribute"),
ct_attribute=operation.get("ct_attribute"),
ct_package_types=[
RuleProcessor._ct_package_type_api_name(ct_package_type)
for ct_package_type in operation.get("ct_package_types", [])
],
ct_packages=kwargs.get("ct_packages"),
ct_package=kwargs.get("codelist_term_maps"),
ct_packages=operation.get("ct_packages", kwargs.get("ct_packages", [])),
attribute_name=operation.get("attribute_name", ""),
key_name=operation.get("key_name", ""),
key_value=operation.get("key_value", ""),
Expand Down
13 changes: 11 additions & 2 deletions resources/schema/Operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,25 @@ Returns a list of valid extensible codelist term's submission values. Used for e

### get_codelist_attributes

Fetches attribute values for a codelist specified in a dataset (like TS)
Fetches controlled terminology attribute values from CT packages based on row-specific CT package and version references.

**Required Parameters:**

- `ct_attribute`: Attribute to extract - `"Term CCODE"`, `"Codelist CCODE"`, `"Term Value"`, `"Codelist Value"`, or `"Term Preferred Term"`
- `target`: Column containing CT reference (e.g., "TSVCDREF")
- `ct_version`: Column containing CT version (e.g., "TSVCDVER")
- `ct_packages`: List of CT packages to search (e.g., `["sdtmct-2020-03-27"]`)

```yaml
- id: $TERM_CCODES
- id: $VALID_TERM_CODES
name: TSVCDREF
operator: get_codelist_attributes
ct_attribute: Term CCODE
ct_version: TSVCDVER
target: TSVCDREF
ct_packages:
- sdtmct-2020-03-27
- sdtmct-2022-12-16
```

### valid_codelist_dates
Expand Down
Loading
Loading