Skip to content
Merged

Cg0418 #1614

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 120 additions & 49 deletions cdisc_rules_engine/check_operators/dataframe_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -1357,14 +1357,16 @@ def is_unique_relationship(self, other_value):
def is_ordered_set(self, other_value):
target = other_value.get("target")
value = other_value.get("comparator")
if not isinstance(value, str):
raise Exception("Comparator must be a single String value")
if not isinstance(value, (str, list)):
raise Exception("Comparator must be a String or list of Strings")
if isinstance(value, list) and not all(isinstance(v, str) for v in value):
raise Exception("All comparator values must be Strings")
return self.value.is_column_sorted_within(value, target)

@log_operator_execution
@type_operator(FIELD_DATAFRAME)
def is_not_ordered_set(self, other_value):
return not self.is_ordered_set(other_value)
return ~self.is_ordered_set(other_value)

@log_operator_execution
@type_operator(FIELD_DATAFRAME)
Expand Down Expand Up @@ -1637,47 +1639,93 @@ def value_has_multiple_references(self, other_value: dict):
def value_does_not_have_multiple_references(self, other_value: dict):
return ~self.value_has_multiple_references(other_value)

def check_basic_sort_order(self, group, target, comparator, ascending):
def check_target_ascending_in_sorted_group(self, group, target, comparator):
"""
Check if target values are in ascending order within a group
already sorted by comparator.
- Null comparator or null target: mark that row as False
- Only check ascending order between rows where both are non-null
"""
is_valid = pd.Series(True, index=group.index)
target_values = group[target].tolist()
comparator_values = group[comparator].tolist()
is_sorted = pd.Series(True, index=group.index)

def safe_compare(x, index):
if pd.isna(x):
is_sorted.loc[index] = False
return "9999-12-31" if ascending else "0001-01-01"
return x

expected_order = sorted(
range(len(comparator_values)),
key=lambda k: safe_compare(comparator_values[k], group.index[k]),
reverse=not ascending,
)
actual_order = sorted(range(len(target_values)), key=lambda k: target_values[k])

mismatches = np.array(expected_order) != np.array(actual_order)
is_sorted.iloc[mismatches] = False
is_numeric_target = pd.api.types.is_numeric_dtype(group[target])

# Mark any row with null comparator or null target as False
for i in range(len(target_values)):
if pd.isna(comparator_values[i]) or pd.isna(target_values[i]):
is_valid.iloc[i] = False

# Only check ascending order on rows where both target and comparator are non-null
valid_positions = [
i
for i in range(len(target_values))
if not pd.isna(comparator_values[i]) and not pd.isna(target_values[i])
]

for i in range(len(valid_positions) - 1):
curr_pos = valid_positions[i]
next_pos = valid_positions[i + 1]
current = target_values[curr_pos]
next_val = target_values[next_pos]

if (
not is_numeric_target
and is_valid_date(current)
and is_valid_date(next_val)
):
date1, _ = parse_date(current)
date2, _ = parse_date(next_val)
if date1 > date2:
is_valid.iloc[curr_pos] = False
is_valid.iloc[next_pos] = False
else:
if current > next_val:
is_valid.iloc[curr_pos] = False
is_valid.iloc[next_pos] = False

return is_sorted
return is_valid

def check_date_overlaps(self, group, target, comparator):
"""
Check for date overlaps in comparator column.
When dates have different precisions and overlap, mark them as invalid.
Only applies to date columns - returns all True for numeric columns.
Skips null comparator values.
"""
comparator_values = group[comparator].tolist()
is_sorted = pd.Series(True, index=group.index)
is_valid = pd.Series(True, index=group.index)
is_numeric = pd.api.types.is_numeric_dtype(group[comparator])

if is_numeric:
return is_valid

# Only check non-null comparator values
valid_positions = [
i
for i in range(len(comparator_values))
if not pd.isna(comparator_values[i])
]

for i in range(len(valid_positions) - 1):
curr_pos = valid_positions[i]
next_pos = valid_positions[i + 1]
current = comparator_values[curr_pos]
next_val = comparator_values[next_pos]

if is_valid_date(current) and is_valid_date(next_val):
date1, prec1 = parse_date(current)
date2, prec2 = parse_date(next_val)

for i in range(len(comparator_values) - 1):
if is_valid_date(comparator_values[i]) and is_valid_date(
comparator_values[i + 1]
):
date1, prec1 = parse_date(comparator_values[i])
date2, prec2 = parse_date(comparator_values[i + 1])
if prec1 != prec2:
overlaps, less_precise = dates_overlap(date1, prec1, date2, prec2)
if overlaps and date1.startswith(less_precise):
is_sorted.iloc[i] = False
elif overlaps and date2.startswith(less_precise):
is_sorted.iloc[i + 1] = False
if overlaps:
if date1.startswith(less_precise):
is_valid.iloc[curr_pos] = False
elif date2.startswith(less_precise):
is_valid.iloc[next_pos] = False

return is_sorted
return is_valid

def _process_grouped_result(
self,
Expand Down Expand Up @@ -1717,38 +1765,58 @@ def _process_grouped_result(
@type_operator(FIELD_DATAFRAME)
def target_is_sorted_by(self, other_value: dict):
"""
Checking the sort order based on comparators, including date overlap checks
Check if target is in ascending order when rows are sorted by comparator.

Nulls in either target or comparator are marked False and excluded
from the ascending order check.

Process:
1. Sort data by within columns (always ASC) and comparator (ASC/DESC)
2. Within each group:
- Mark null comparator or null target rows as False
- Check remaining rows: is target ascending?
- Check for date overlaps in comparator (if dates)
3. Map results back to original row order
"""
target = other_value.get("target")
within_columns = self._normalize_grouping_columns(other_value.get("within"))
columns = other_value["comparator"]

result = pd.Series([True] * len(self.value), index=self.value.index)

for col in columns:
comparator: str = self.replace_prefix(col["name"])
ascending: bool = col["sort_order"].lower() != "desc"
na_pos: str = col["null_position"]

selected_columns = list(
dict.fromkeys([target, comparator, *within_columns])
)

# Sort by within columns (always ASC) and comparator in specified order
sorted_df = self.value[selected_columns].sort_values(
by=[*within_columns, comparator],
ascending=ascending,
na_position=na_pos,
ascending=[True] * len(within_columns) + [ascending],
)
grouped_df = sorted_df.groupby(within_columns)
basic_sort_check = grouped_df.apply(
lambda x: self.check_basic_sort_order(x, target, comparator, ascending)

grouped_df = sorted_df.groupby(within_columns, sort=False)

# Check 1: Target is ascending in sorted groups, nulls marked False
target_check = grouped_df.apply(
lambda x: self.check_target_ascending_in_sorted_group(
x, target, comparator
)
)
basic_sort_check = self._process_grouped_result(
basic_sort_check,
target_check = self._process_grouped_result(
target_check,
grouped_df,
within_columns,
sorted_df,
lambda group: self.check_basic_sort_order(
group, target, comparator, ascending
lambda group: self.check_target_ascending_in_sorted_group(
group, target, comparator
),
)

# Check 2: No date overlaps in comparator (only for date columns)
date_overlap_check = grouped_df.apply(
lambda x: self.check_date_overlaps(x, target, comparator)
)
Expand All @@ -1759,15 +1827,18 @@ def target_is_sorted_by(self, other_value: dict):
sorted_df,
lambda group: self.check_date_overlaps(group, target, comparator),
)
combined_check = basic_sort_check & date_overlap_check
result = result.reindex(sorted_df.index, fill_value=True)
result = result & combined_check
result = result.reindex(self.value.index, fill_value=True)

# Combine both checks
combined_check = target_check & date_overlap_check

# Map results back to original dataframe order
result = result & combined_check.reindex(self.value.index, fill_value=True)

if isinstance(result, (pd.DataFrame, dd.DataFrame)):
if isinstance(result, dd.DataFrame):
result = result.compute()
result = result.squeeze()

return result

@log_operator_execution
Expand Down
27 changes: 14 additions & 13 deletions cdisc_rules_engine/models/dataset/dask_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import dask.dataframe as dd
import dask.array as da
import pandas as pd
import numpy as np
import re
import dask
from typing import List, Union
Expand Down Expand Up @@ -59,19 +58,21 @@ def __getitem__(self, item):
raise

def is_column_sorted_within(self, group, column):
return (
False
not in np.concatenate(
self._data.groupby(group, sort=False)[column]
.apply(
lambda partition: sorted(partition.sort_index().values)
== partition.sort_index().values
)
.compute()
.values
if isinstance(group, str):
group = [group]

def check_partition(partition):
sorted_vals = sorted(partition.values)
return pd.Series(
[a == b for a, b in zip(partition.values, sorted_vals)],
index=partition.index,
)
.ravel()
.tolist()

return (
self._data.compute()
.groupby(group)[column]
.transform(check_partition)
.sort_index()
)

def __setitem__(self, key, value):
Expand Down
18 changes: 11 additions & 7 deletions cdisc_rules_engine/models/dataset/pandas_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,17 @@ def get_grouped_size(self, by, **kwargs):
return grouped_data.size()

def is_column_sorted_within(self, group, column):
return (
False
not in self.groupby(group)[column]
.apply(list)
.map(lambda x: sorted(x) == x)
.values
)
if isinstance(group, str):
group = [group]

def check_partition(partition):
sorted_vals = sorted(partition.values)
return pd.Series(
[a == b for a, b in zip(partition.values, sorted_vals)],
index=partition.index,
)

return self.groupby(group)[column].transform(check_partition)

def concat(self, other: Union[DatasetInterface, List[DatasetInterface]], **kwargs):
if isinstance(other, list):
Expand Down
4 changes: 0 additions & 4 deletions resources/schema/rule/Operator.json
Original file line number Diff line number Diff line change
Expand Up @@ -569,10 +569,6 @@
"items": {
"properties": {
"name": { "$ref": "Operator.json#/properties/name" },
"null_position": {
"enum": ["first", "last"],
"type": "string"
},
"order": { "$ref": "Operator.json#/properties/order" }
},
"type": "object"
Expand Down
15 changes: 12 additions & 3 deletions resources/schema/rule/Operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ Complement of `has_next_corresponding_record`

### is_ordered_set

True if the dataset rows are in ascending order of the values within `name`, grouped by the values within `value`
True if the dataset rows are in ascending order of the values within `name`, grouped by the values within `value`. Value can either be a single column or multiple.

```yaml
Check:
Expand All @@ -1069,6 +1069,16 @@ Check:
value: USUBJID
```

```yaml
Check:
all:
- name: --SEQ
operator: is_ordered_set
value:
- USUBJID
- "--TESTCD"
```

### is_ordered_by

True if the dataset rows are ordered by the values within `name`, given the ordering specified by `order`
Expand All @@ -1087,7 +1097,7 @@ Complement of `is_ordered_by`

### target_is_sorted_by

True if the values in `name` are ordered according to the values specified by `value` grouped by the values in `within`. Each `value` requires a variable `name`, ordering specified by `order`, and the null position specified by `null_position`. `within` accepts either a single column or an ordered list of columns.
True if the values in `name` are ordered according to the values specified by `value` in ascending/descending order, grouped by the values in `within`. Each `value` requires a variable `name` and an ordering of 'asc' or 'desc' specified by `order`. `within` accepts either a single column or an ordered list of columns. Columns can be either number or Char Dates in ISO8601 'YYYY-MM-DD' format

```yaml
Check:
Expand All @@ -1100,7 +1110,6 @@ Check:
value:
- name: --STDTC
sort_order: asc
null_position: last
```

### target_is_not_sorted_by
Expand Down
Loading
Loading