-
Notifications
You must be signed in to change notification settings - Fork 27
operator rework #1638
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
operator rework #1638
Changes from all commits
08d141e
fc95245
789f4ba
f9911a9
b5b23d2
3f9535c
833d8f2
554fa70
cefc4eb
051ad08
a2bc0c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1642,91 +1642,176 @@ def value_has_multiple_references(self, other_value: dict): | |
| def value_does_not_have_multiple_references(self, other_value: dict): | ||
| return ~self.value_has_multiple_references(other_value) | ||
|
|
||
| def check_target_ascending_in_sorted_group(self, group, target, comparator): | ||
| def _mark_invalid_null_positions(self, is_valid, group, null_mask, na_pos): | ||
| null_indices = group[null_mask].index.tolist() | ||
| non_null_indices = group[~null_mask].index.tolist() | ||
| index_order = group.index.tolist() | ||
|
|
||
| if not null_indices or not non_null_indices: | ||
| return is_valid | ||
|
|
||
| if na_pos == "last": | ||
| last_non_null = max(index_order.index(i) for i in non_null_indices) | ||
| first_null = min(index_order.index(i) for i in null_indices) | ||
| if first_null < last_non_null: | ||
| is_valid[null_mask] = False | ||
| else: | ||
| last_null = max(index_order.index(i) for i in null_indices) | ||
| first_non_null = min(index_order.index(i) for i in non_null_indices) | ||
| if last_null > first_non_null: | ||
| is_valid[null_mask] = False | ||
|
|
||
| return is_valid | ||
|
|
||
| def _verify_neighbor_consistency( | ||
| self, | ||
| is_valid, | ||
| non_null_rows, | ||
| target, | ||
| comparator, | ||
| ascending, | ||
| is_numeric_comparator, | ||
| ): | ||
| indices = non_null_rows.index.tolist() | ||
| comparator_vals = non_null_rows[comparator].tolist() | ||
|
|
||
| for i, idx in enumerate(indices): | ||
| if not is_valid.loc[idx]: | ||
| continue | ||
|
|
||
| curr = comparator_vals[i] | ||
| if self._is_null_or_empty(curr): | ||
| continue | ||
|
|
||
| prev = next( | ||
| ( | ||
| comparator_vals[j] | ||
| for j in range(i - 1, -1, -1) | ||
| if not self._is_null_or_empty(comparator_vals[j]) | ||
| ), | ||
| None, | ||
| ) | ||
| nxt = next( | ||
| ( | ||
| comparator_vals[j] | ||
| for j in range(i + 1, len(comparator_vals)) | ||
| if not self._is_null_or_empty(comparator_vals[j]) | ||
| ), | ||
| None, | ||
| ) | ||
| if not is_numeric_comparator and not is_valid_date(str(curr)): | ||
| continue | ||
|
|
||
| if ascending: | ||
| if prev is not None and curr < prev: | ||
| is_valid.loc[idx] = False | ||
| elif nxt is not None and curr > nxt: | ||
| is_valid.loc[idx] = False | ||
| else: | ||
| if prev is not None and curr > prev: | ||
| is_valid.loc[idx] = False | ||
| elif nxt is not None and curr < nxt: | ||
| is_valid.loc[idx] = False | ||
|
|
||
| return is_valid | ||
|
|
||
| def check_target_ascending_in_sorted_group( | ||
| self, group, target, comparator, ascending, na_pos | ||
| ): | ||
| """ | ||
| Check if target values are in ascending order within a group | ||
| already sorted by comparator. | ||
| - Null comparator or null target: mark that row as False | ||
| - Only check ascending order between rows where both are non-null | ||
| """ | ||
| is_valid = pd.Series(True, index=group.index) | ||
| target_values = group[target].tolist() | ||
| comparator_values = group[comparator].tolist() | ||
| is_numeric_comparator = pd.api.types.is_numeric_dtype(group[comparator]) | ||
| is_numeric_target = pd.api.types.is_numeric_dtype(group[target]) | ||
|
|
||
| # Mark any row with null comparator or null target as False | ||
| for i in range(len(target_values)): | ||
| if pd.isna(comparator_values[i]) or pd.isna(target_values[i]): | ||
| is_valid.iloc[i] = False | ||
| null_mask = group[comparator].isna() | ( | ||
| group[comparator].astype(str).str.strip() == "" | ||
| ) | ||
| non_null_rows = group[~null_mask] | ||
|
|
||
| # Only check ascending order on rows where both target and comparator are non-null | ||
| valid_positions = [ | ||
| i | ||
| for i in range(len(target_values)) | ||
| if not pd.isna(comparator_values[i]) and not pd.isna(target_values[i]) | ||
| is_valid = self._mark_invalid_null_positions(is_valid, group, null_mask, na_pos) | ||
| overlap_check = self.check_date_overlaps(group, target, comparator) | ||
| overlap_invalid_mask = ~overlap_check | ||
| non_null_rows_for_order_check = non_null_rows[ | ||
| ~overlap_invalid_mask[non_null_rows.index] | ||
| ] | ||
|
|
||
| for i in range(len(valid_positions) - 1): | ||
| curr_pos = valid_positions[i] | ||
| next_pos = valid_positions[i + 1] | ||
| current = target_values[curr_pos] | ||
| next_val = target_values[next_pos] | ||
|
|
||
| if ( | ||
| non_null_sorted = non_null_rows_for_order_check.sort_values( | ||
| by=comparator, ascending=ascending | ||
| ) | ||
| actual_target = non_null_rows_for_order_check[target].tolist() | ||
| expected_target = non_null_sorted[target].tolist() | ||
| non_null_indices = non_null_rows_for_order_check.index.tolist() | ||
|
|
||
| for i in range(len(actual_target)): | ||
| actual = actual_target[i] | ||
| expected_val = expected_target[i] | ||
|
|
||
| if pd.isna(actual) and pd.isna(expected_val): | ||
| continue | ||
| elif pd.isna(actual) or pd.isna(expected_val): | ||
| is_valid.loc[non_null_indices[i]] = False | ||
| elif ( | ||
| not is_numeric_target | ||
| and is_valid_date(current) | ||
| and is_valid_date(next_val) | ||
| and is_valid_date(actual) | ||
| and is_valid_date(expected_val) | ||
| ): | ||
| date1, _ = parse_date(current) | ||
| date2, _ = parse_date(next_val) | ||
| if date1 > date2: | ||
| is_valid.iloc[curr_pos] = False | ||
| is_valid.iloc[next_pos] = False | ||
| date1, _ = parse_date(actual) | ||
| date2, _ = parse_date(expected_val) | ||
| if date1 != date2: | ||
| is_valid.loc[non_null_indices[i]] = False | ||
| else: | ||
| if current > next_val: | ||
| is_valid.iloc[curr_pos] = False | ||
| is_valid.iloc[next_pos] = False | ||
|
|
||
| if actual != expected_val: | ||
| is_valid.loc[non_null_indices[i]] = False | ||
|
|
||
| non_null_target_sorted = non_null_rows.sort_values(by=target, ascending=True) | ||
| is_valid = self._verify_neighbor_consistency( | ||
| is_valid, | ||
| non_null_target_sorted, | ||
| target, | ||
| comparator, | ||
| ascending, | ||
| is_numeric_comparator, | ||
| ) | ||
| return is_valid | ||
|
|
||
| def check_date_overlaps(self, group, target, comparator): | ||
| """ | ||
| Check for date overlaps in comparator column. | ||
| When dates have different precisions and overlap, mark them as invalid. | ||
| Only applies to date columns - returns all True for numeric columns. | ||
| Skips null comparator values. | ||
| """ | ||
| comparator_values = group[comparator].tolist() | ||
| is_valid = pd.Series(True, index=group.index) | ||
| is_numeric = pd.api.types.is_numeric_dtype(group[comparator]) | ||
|
|
||
| if is_numeric: | ||
| return is_valid | ||
|
|
||
| # Only check non-null comparator values | ||
| valid_positions = [ | ||
| i | ||
| for i in range(len(comparator_values)) | ||
| if not pd.isna(comparator_values[i]) | ||
| if not ( | ||
| pd.isna(comparator_values[i]) or str(comparator_values[i]).strip() == "" | ||
| ) | ||
| ] | ||
|
|
||
| for i in range(len(valid_positions) - 1): | ||
| for i in range(len(valid_positions)): | ||
| curr_pos = valid_positions[i] | ||
| next_pos = valid_positions[i + 1] | ||
| current = comparator_values[curr_pos] | ||
| next_val = comparator_values[next_pos] | ||
|
|
||
| if is_valid_date(current) and is_valid_date(next_val): | ||
| date1, prec1 = parse_date(current) | ||
| date2, prec2 = parse_date(next_val) | ||
|
|
||
| if not is_valid_date(current): | ||
| continue | ||
| _, prec1 = parse_date(current) | ||
| for j in range(len(valid_positions)): | ||
| if i == j: | ||
| continue | ||
| other_pos = valid_positions[j] | ||
| other = comparator_values[other_pos] | ||
| if not is_valid_date(other): | ||
| continue | ||
| _, prec2 = parse_date(other) | ||
| if prec1 != prec2: | ||
| overlaps, less_precise = dates_overlap(date1, prec1, date2, prec2) | ||
| overlaps, _ = dates_overlap(current, prec1, other, prec2) | ||
| if overlaps: | ||
| if date1.startswith(less_precise): | ||
| is_valid.iloc[curr_pos] = False | ||
| elif date2.startswith(less_precise): | ||
| is_valid.iloc[next_pos] = False | ||
| is_valid.iloc[curr_pos] = False | ||
| is_valid.iloc[other_pos] = False | ||
|
|
||
| return is_valid | ||
|
|
||
|
|
@@ -1767,20 +1852,6 @@ def _process_grouped_result( | |
| @log_operator_execution | ||
| @type_operator(FIELD_DATAFRAME) | ||
| def target_is_sorted_by(self, other_value: dict): | ||
| """ | ||
| Check if target is in ascending order when rows are sorted by comparator. | ||
|
|
||
| Nulls in either target or comparator are marked False and excluded | ||
| from the ascending order check. | ||
|
|
||
| Process: | ||
| 1. Sort data by within columns (always ASC) and comparator (ASC/DESC) | ||
| 2. Within each group: | ||
| - Mark null comparator or null target rows as False | ||
| - Check remaining rows: is target ascending? | ||
| - Check for date overlaps in comparator (if dates) | ||
| 3. Map results back to original row order | ||
| """ | ||
| target = other_value.get("target") | ||
| within_columns = self._normalize_grouping_columns(other_value.get("within")) | ||
| columns = other_value["comparator"] | ||
|
|
@@ -1790,23 +1861,22 @@ def target_is_sorted_by(self, other_value: dict): | |
| for col in columns: | ||
| comparator: str = self.replace_prefix(col["name"]) | ||
| ascending: bool = col["sort_order"].lower() != "desc" | ||
| na_pos: str = col.get("null_position", "last") | ||
|
|
||
| selected_columns = list( | ||
| dict.fromkeys([target, comparator, *within_columns]) | ||
| ) | ||
|
|
||
| # Sort by within columns (always ASC) and comparator in specified order | ||
| sorted_df = self.value[selected_columns].sort_values( | ||
| by=[*within_columns, comparator], | ||
| ascending=[True] * len(within_columns) + [ascending], | ||
| by=[*within_columns, target], | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please explain why are we changing the sorting to
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so technically we want both the target and the comparator to be in order. If we sort by comparator then check the order of target it is the same as sorting by the target and checking the comparator. |
||
| ascending=[True] * (len(within_columns) + 1), | ||
| ) | ||
|
|
||
| grouped_df = sorted_df.groupby(within_columns, sort=False) | ||
|
|
||
| # Check 1: Target is ascending in sorted groups, nulls marked False | ||
| target_check = grouped_df.apply( | ||
| lambda x: self.check_target_ascending_in_sorted_group( | ||
| x, target, comparator | ||
| x, target, comparator, ascending, na_pos | ||
| ) | ||
| ) | ||
| target_check = self._process_grouped_result( | ||
|
|
@@ -1815,11 +1885,10 @@ def target_is_sorted_by(self, other_value: dict): | |
| within_columns, | ||
| sorted_df, | ||
| lambda group: self.check_target_ascending_in_sorted_group( | ||
| group, target, comparator | ||
| group, target, comparator, ascending, na_pos | ||
| ), | ||
| ) | ||
|
|
||
| # Check 2: No date overlaps in comparator (only for date columns) | ||
| date_overlap_check = grouped_df.apply( | ||
| lambda x: self.check_date_overlaps(x, target, comparator) | ||
| ) | ||
|
|
@@ -1831,10 +1900,7 @@ def target_is_sorted_by(self, other_value: dict): | |
| lambda group: self.check_date_overlaps(group, target, comparator), | ||
| ) | ||
|
|
||
| # Combine both checks | ||
| combined_check = target_check & date_overlap_check | ||
|
|
||
| # Map results back to original dataframe order | ||
| result = result & combined_check.reindex(self.value.index, fill_value=True) | ||
|
|
||
| if isinstance(result, (pd.DataFrame, dd.DataFrame)): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we please add test for new behavior where if there is a overlap with different precision we mark both as invalid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added