From 063837a8823a4494f89a1260051c9e2bed2ff395 Mon Sep 17 00:00:00 2001 From: matteo Date: Tue, 1 Jul 2025 12:04:31 +0200 Subject: [PATCH 1/2] chore(tests): typed singling out evaluator tests. --- tests/test_singling_out_evaluator.py | 57 ++++++++++++---------------- 1 file changed, 24 insertions(+), 33 deletions(-) diff --git a/tests/test_singling_out_evaluator.py b/tests/test_singling_out_evaluator.py index 540a2c1..ced713d 100644 --- a/tests/test_singling_out_evaluator.py +++ b/tests/test_singling_out_evaluator.py @@ -1,6 +1,8 @@ # This file is part of Anonymeter and is released under BSD 3-Clause Clear License. # Copyright (c) 2022 Anonos IP LLC. # See https://github.com/statice/anonymeter/blob/main/LICENSE.md for details. +from typing import Optional + import numpy as np import pandas as pd import polars as pl @@ -20,19 +22,17 @@ @pytest.mark.parametrize("mode", ["univariate", "multivariate"]) -def test_so_general(mode): +def test_so_general(mode: str) -> None: ori = get_adult("ori", n_samples=10) syn = get_adult("syn", n_samples=10) - soe = SinglingOutEvaluator(ori=ori, syn=syn, n_attacks=5).evaluate( - mode=mode - ) + soe = SinglingOutEvaluator(ori=ori, syn=syn, n_attacks=5).evaluate(mode=mode) for q in soe.queries(): assert len(soe._syn.filter(q)) == 1 assert len(soe._ori.filter(q)) == 1 -def test_singling_out_queries_unique(): +def test_singling_out_queries_unique() -> None: df = pl.DataFrame({"c1": [1], "c2": [2]}) queries = UniqueSinglingOutQueries(max_size=2) @@ -47,7 +47,7 @@ def test_singling_out_queries_unique(): assert [str(q) for q in queries.queries] == [str(q1), str(q2)] -def test_singling_out_queries_same_characters(): +def test_singling_out_queries_same_characters() -> None: df = pl.DataFrame([{"c": 1.2}, {"c": 2.1}]) queries = UniqueSinglingOutQueries(max_size=2) @@ -62,7 +62,7 @@ def test_singling_out_queries_same_characters(): assert queries.queries == [q1, q2] -def test_singling_out_queries(): +def test_singling_out_queries() -> None: df = pl.DataFrame({"c1": [1, 1], "c2": [2, 3]}) queries = UniqueSinglingOutQueries(max_size=2) @@ -87,7 +87,7 @@ def test_singling_out_queries(): ((pl.col("c1") == 2) & (pl.col("c2") == "c"), 1), ], ) -def test_evaluate_queries(query, result): +def test_evaluate_queries(query: pl.Expr, result: Optional[int]) -> None: df = pl.DataFrame({"c1": [0, 0, 2], "c2": ["a", "a", "c"]}) try: out = _evaluate_queries(df=df, queries=[query]) @@ -104,32 +104,29 @@ def test_evaluate_queries(query, result): ], ids=["plain", "with_null_column"], ) -def test_univariate_singling_out_queries(df: pl.DataFrame): - queries = univariate_singling_out_queries( - df=df, n_queries=10, rng=np.random.default_rng(0) - ) +def test_univariate_singling_out_queries(df: pl.DataFrame) -> None: + queries = univariate_singling_out_queries(df=df, n_queries=10, rng=np.random.default_rng(0)) expected = [str(pl.col("col1") == v) for v in ["a", "b", "c", "d"]] assert sorted(map(str, queries)) == sorted(expected) -def test_univariate_singling_out_queries_mixed_null_column(): +def test_univariate_singling_out_queries_mixed_null_column() -> None: df = pl.DataFrame({"col1": ["a", "b", "c", "d"], "col2": [2, 1, 3, None]}) - queries = univariate_singling_out_queries( - df=df, n_queries=10, rng=np.random.default_rng(0) - ) + queries = univariate_singling_out_queries(df=df, n_queries=10, rng=np.random.default_rng(0)) - expected = [str(pl.col("col1") == v) for v in ["a", "b", "c", "d"]] + \ - [str(pl.col("col2") == v) for v in [1, 2, 3]] + \ - [str(pl.col("col2") <= 1), str(pl.col("col2") >= 3), - str(pl.col("col2").is_null())] + expected = ( + [str(pl.col("col1") == v) for v in ["a", "b", "c", "d"]] + + [str(pl.col("col2") == v) for v in [1, 2, 3]] + + [str(pl.col("col2") <= 1), str(pl.col("col2") >= 3), str(pl.col("col2").is_null())] + ) print(sorted(map(str, queries))) print(sorted(expected)) assert sorted(map(str, queries)) == sorted(expected) -def test_singling_out_query_generator(): +def test_singling_out_query_generator() -> None: df = pl.DataFrame({"c0": ["a", "b"], "c1": [1.23, 9.87]}) queries = multivariate_singling_out_queries( df=df, n_queries=2, n_cols=2, rng=np.random.default_rng(0), max_attempts=None @@ -152,7 +149,7 @@ def test_singling_out_query_generator(): @pytest.mark.parametrize("confidence_level", [0.5, 0.68, 0.95, 0.99]) @pytest.mark.parametrize("mode", ["univariate", "multivariate"]) -def test_singling_out_risk_estimate(confidence_level, mode): +def test_singling_out_risk_estimate(confidence_level: float, mode: str) -> None: ori = get_adult("ori", 10) soe = SinglingOutEvaluator(ori=ori, syn=ori, n_attacks=5) soe.evaluate(mode=mode) @@ -167,26 +164,20 @@ def test_evaluator_not_evaluated(): @pytest.mark.parametrize("n", [100, 4242, 11235]) -@pytest.mark.parametrize( - "w_min, w_max", [(0, 1), (1 / 10000, 1 / 1000), (0.0013414, 0.2314)] -) -def test_probability_integral(n, w_min, w_max): +@pytest.mark.parametrize("w_min, w_max", [(0, 1), (1 / 10000, 1 / 1000), (0.0013414, 0.2314)]) +def test_probability_integral(n: int, w_min: float, w_max: float) -> None: def _so_probability(n: int, w: float): return n * w * ((1 - w) ** (n - 1)) - desired, _ = integrate.quad( - lambda x: _so_probability(w=x, n=n), a=w_min, b=w_max - ) + desired, _ = integrate.quad(lambda x: _so_probability(w=x, n=n), a=w_min, b=w_max) integral = singling_out_probability_integral(n=n, w_min=w_min, w_max=w_max) np.testing.assert_almost_equal(desired, integral) @pytest.mark.parametrize("max_attempts", [1, 2, 3]) -def test_so_evaluator_max_attempts(max_attempts): +def test_so_evaluator_max_attempts(max_attempts: int) -> None: ori = get_adult("ori", 10) - soe = SinglingOutEvaluator( - ori=ori, syn=ori, n_attacks=10, max_attempts=max_attempts - ) + soe = SinglingOutEvaluator(ori=ori, syn=ori, n_attacks=10, max_attempts=max_attempts) soe.evaluate(mode="multivariate") assert len(soe.queries()) <= max_attempts From d371ff9808eaa1b4db4097d11f7b3537b75fd2d8 Mon Sep 17 00:00:00 2001 From: matteo Date: Tue, 1 Jul 2025 13:58:50 +0200 Subject: [PATCH 2/2] fix(singling-out): make singling out evaluator robust to weird column names in input data. --- .../evaluators/singling_out_evaluator.py | 160 ++++++++---------- tests/test_singling_out_evaluator.py | 19 +++ 2 files changed, 87 insertions(+), 92 deletions(-) diff --git a/src/anonymeter/evaluators/singling_out_evaluator.py b/src/anonymeter/evaluators/singling_out_evaluator.py index d4a30c4..16d1e1c 100644 --- a/src/anonymeter/evaluators/singling_out_evaluator.py +++ b/src/anonymeter/evaluators/singling_out_evaluator.py @@ -7,6 +7,7 @@ import operator from collections.abc import Sequence from functools import reduce +from keyword import iskeyword from typing import Any, Callable, Optional, Union, cast import numpy as np @@ -20,6 +21,40 @@ logger = logging.getLogger(__name__) +def _safe_column_names(df: pd.DataFrame) -> pd.DataFrame: + """Modify column names in dataframes so that we can use it to build queries. + + Mathematical symbols like `-` or other python keywords (or 'datetime') + in column names are replaced. + + Parameters + ---------- + df : pd.DataFrame + Input dataframe + + Returns + ------- + pd.DataFrame + Dataframe with safe column names + + """ + symbols = ["-", "*", "/", "+"] + replace_with = "_" + replacements = {} + for old_column in df.columns: + new_column = old_column + for symbol in symbols: + if symbol in new_column: + new_column = new_column.replace(symbol, replace_with) + + if iskeyword(new_column) or new_column == "datetime": + old_column = "_anonymeter_" + new_column + + replacements[old_column] = old_column + + return df.rename(columns=replacements) + + def _escape_quotes(string: str) -> str: return string.replace('"', '\\"').replace("'", "\\'") @@ -66,15 +101,14 @@ def _query_from_record( expr = reduce(operator.and_, expr_components) return expr + def _operator_choice( - operators: Sequence[Callable[[Any, Any], bool]], - rng: np.random.Generator + operators: Sequence[Callable[[Any, Any], bool]], rng: np.random.Generator ) -> Callable[[Any, Any], bool]: - return rng.choice(operators) #type: ignore[arg-type] # signature of "choice" does not accept a list of callables but works fine in practice + return rng.choice(operators) # type: ignore[arg-type] # signature of "choice" does not accept a list of callables but works fine in practice + -def _random_operator( - data_type: str, rng: np.random.Generator -) -> Callable[[Any, Any], Union[bool, pl.Expr]]: +def _random_operator(data_type: str, rng: np.random.Generator) -> Callable[[Any, Any], Union[bool, pl.Expr]]: if data_type in ["categorical", "boolean"]: ops: Sequence[Callable[[Any, Any], bool]] = [operator.eq, operator.ne] elif data_type == "numerical": @@ -143,32 +177,20 @@ def _random_queries( rng: np.random.Generator, ) -> list[pl.Expr]: unique_values = {col: df[col].unique().to_list() for col in df.columns} - column_types = { - col: _convert_polars_dtype(df[col].dtype) - for col in df.columns - } + column_types = {col: _convert_polars_dtype(df[col].dtype) for col in df.columns} queries = [] for _ in range(n_queries): - selected_cols = rng.choice( - df.columns, size=n_cols, replace=False - ).tolist() + selected_cols = rng.choice(df.columns, size=n_cols, replace=False).tolist() queries.append( - _random_query( - unique_values=unique_values, - cols=selected_cols, - column_types=column_types, - rng=rng - ) + _random_query(unique_values=unique_values, cols=selected_cols, column_types=column_types, rng=rng) ) return queries -def singling_out_probability_integral( - n: int, w_min: float, w_max: float -) -> float: +def singling_out_probability_integral(n: int, w_min: float, w_max: float) -> float: """Integral of the singling out probability within a given range. The probability that a query singles out in a population of size @@ -198,18 +220,14 @@ def singling_out_probability_integral( """ if w_min < 0 or w_min > 1: - raise ValueError( - f"Parameter `w_min` must be between 0 and 1. Got {w_min} instead." - ) + raise ValueError(f"Parameter `w_min` must be between 0 and 1. Got {w_min} instead.") if w_max < w_min or w_max > 1: raise ValueError( f"Parameter `w_max` must be greater than w_min ({w_min}) and smaller than 1. Got {w_max} instead." ) - return ( - (n * w_min + 1) * (1 - w_min) ** n - (n * w_max + 1) * (1 - w_max) ** n - ) / (n + 1) + return ((n * w_min + 1) * (1 - w_min) ** n - (n * w_max + 1) * (1 - w_max) ** n) / (n + 1) def _measure_queries_success( @@ -233,9 +251,7 @@ def _model(x, w_eff, norm): def _fit_model(sizes: npt.NDArray, successes: npt.NDArray) -> Callable: # initial guesses w_eff_guess = 1 / np.max(sizes) - norm_guess = 1 / singling_out_probability_integral( - n=np.max(sizes), w_min=0, w_max=w_eff_guess - ) + norm_guess = 1 / singling_out_probability_integral(n=np.max(sizes), w_min=0, w_max=w_eff_guess) popt, _ = curve_fit( _model, @@ -265,9 +281,7 @@ def fit_correction_term(df: pl.DataFrame, queries: list[pl.Expr]) -> Callable: depends on the size of the dataset. """ - sizes, successes = _measure_queries_success( - df=df, queries=queries, n_repeat=5, n_meas=10 - ) + sizes, successes = _measure_queries_success(df=df, queries=queries, n_repeat=5, n_meas=10) return _fit_model(sizes=sizes, successes=successes) @@ -323,9 +337,7 @@ def queries(self) -> list[pl.Expr]: return self._list -def univariate_singling_out_queries( - df: pl.DataFrame, n_queries: int, rng: np.random.Generator -) -> list[pl.Expr]: +def univariate_singling_out_queries(df: pl.DataFrame, n_queries: int, rng: np.random.Generator) -> list[pl.Expr]: """Generate singling out queries from rare attributes. Parameters @@ -374,7 +386,7 @@ def univariate_singling_out_queries( if len(rare_values) > 0: queries.extend([pl.col(col) == val for val in rare_values]) - rng.shuffle(queries) #type: ignore[arg-type] # signature of "shuffle" does not accept a list of expressions but works fine in practice + rng.shuffle(queries) # type: ignore[arg-type] # signature of "shuffle" does not accept a list of expressions but works fine in practice unique_so_queries = UniqueSinglingOutQueries(max_size=n_queries) unique_so_queries.check_and_extend(queries, df) @@ -444,18 +456,13 @@ def multivariate_singling_out_queries( # Generate a batch of queries # Pre-sample all random row indices - random_indices = rng.integers( - low=0, high=df.shape[0], size=batch_size - ) + random_indices = rng.integers(low=0, high=df.shape[0], size=batch_size) # Extract all records in bulk records = df[random_indices].to_dicts() # Pre-sample all column choices - selected_columns = [ - rng.choice(df.columns, size=n_cols, replace=False).tolist() - for _ in range(batch_size) - ] + selected_columns = [rng.choice(df.columns, size=n_cols, replace=False).tolist() for _ in range(batch_size)] queries_batch = [ _query_from_record( @@ -478,25 +485,16 @@ def multivariate_singling_out_queries( return unique_so_queries.queries -def _evaluate_queries( - df: pl.DataFrame, queries: list[pl.Expr] -) -> tuple[int, ...]: +def _evaluate_queries(df: pl.DataFrame, queries: list[pl.Expr]) -> tuple[int, ...]: if len(queries) == 0: return () - result_df = df.select( - [ - q.cast(pl.Int64).sum().alias(f"count_{i}") - for i, q in enumerate(queries) - ] - ) + result_df = df.select([q.cast(pl.Int64).sum().alias(f"count_{i}") for i, q in enumerate(queries)]) counts = result_df.row(0) return counts -def _evaluate_queries_and_return_successful( - df: pl.DataFrame, queries: list[pl.Expr] -) -> list[pl.Expr]: +def _evaluate_queries_and_return_successful(df: pl.DataFrame, queries: list[pl.Expr]) -> list[pl.Expr]: counts = _evaluate_queries(df=df, queries=queries) counts_np = np.array(counts, dtype=float) @@ -520,9 +518,7 @@ def _generate_singling_out_queries( rng: np.random.Generator, ) -> list[pl.Expr]: if mode == "univariate": - queries = univariate_singling_out_queries( - df=df, n_queries=n_attacks, rng=rng - ) + queries = univariate_singling_out_queries(df=df, n_queries=n_attacks, rng=rng) elif mode == "multivariate": queries = multivariate_singling_out_queries( @@ -534,9 +530,7 @@ def _generate_singling_out_queries( ) else: - raise RuntimeError( - f"Parameter `mode` can be either `univariate` or `multivariate`. Got {mode} instead." - ) + raise RuntimeError(f"Parameter `mode` can be either `univariate` or `multivariate`. Got {mode} instead.") if len(queries) < n_attacks: logger.warning( @@ -604,8 +598,8 @@ def __init__( max_attempts: Optional[int] = 10000000, seed: Optional[int] = None, ): - ori = pl.DataFrame(ori) - syn = pl.DataFrame(syn) + ori = pl.DataFrame(_safe_column_names(ori)) + syn = pl.DataFrame(_safe_column_names(syn)) self._ori = ori.unique(maintain_order=True) self._syn = syn.unique(maintain_order=True) self._n_attacks = n_attacks @@ -613,7 +607,7 @@ def __init__( if control is None: self._control = None else: - control = pl.DataFrame(control) + control = pl.DataFrame(_safe_column_names(control)) self._control = control.unique(maintain_order=True) self._max_attempts = max_attempts self._queries: list[pl.Expr] = [] @@ -659,9 +653,7 @@ def evaluate(self, mode: str = "multivariate") -> "SinglingOutEvaluator": elif mode == "univariate": n_cols = 1 else: - raise ValueError( - f"mode must be either 'multivariate' or 'univariate', got {mode} instead." - ) + raise ValueError(f"mode must be either 'multivariate' or 'univariate', got {mode} instead.") queries = _generate_singling_out_queries( df=self._syn, @@ -671,9 +663,7 @@ def evaluate(self, mode: str = "multivariate") -> "SinglingOutEvaluator": max_attempts=self._max_attempts, rng=self._rng, ) - self._queries = _evaluate_queries_and_return_successful( - df=self._ori, queries=queries - ) + self._queries = _evaluate_queries_and_return_successful(df=self._ori, queries=queries) self._n_success = len(self._queries) baseline_queries = _random_queries( @@ -682,31 +672,21 @@ def evaluate(self, mode: str = "multivariate") -> "SinglingOutEvaluator": n_cols=n_cols, rng=self._rng, ) - self._baseline_queries = _evaluate_queries_and_return_successful( - df=self._ori, queries=baseline_queries - ) + self._baseline_queries = _evaluate_queries_and_return_successful(df=self._ori, queries=baseline_queries) self._n_baseline = len(self._baseline_queries) if self._control is None: self._n_control = None else: - self._n_control = len( - _evaluate_queries_and_return_successful( - df=self._control, queries=queries - ) - ) + self._n_control = len(_evaluate_queries_and_return_successful(df=self._control, queries=queries)) # correct the number of success against the control set # to account for different dataset sizes. if len(self._control) != len(self._ori): # fit the model to the data: - fitted_model = fit_correction_term( - df=self._control, queries=queries - ) + fitted_model = fit_correction_term(df=self._control, queries=queries) - correction = fitted_model(len(self._ori)) / fitted_model( - len(self._control) - ) + correction = fitted_model(len(self._ori)) / fitted_model(len(self._control)) self._n_control *= correction self._evaluated = True @@ -727,9 +707,7 @@ def results(self, confidence_level: float = 0.95) -> EvaluationResults: """ if not self._evaluated: - raise RuntimeError( - "The singling out evaluator wasn't evaluated yet. Please, run `evaluate()` first." - ) + raise RuntimeError("The singling out evaluator wasn't evaluated yet. Please, run `evaluate()` first.") return EvaluationResults( n_attacks=self._n_attacks, @@ -739,9 +717,7 @@ def results(self, confidence_level: float = 0.95) -> EvaluationResults: confidence_level=confidence_level, ) - def risk( - self, confidence_level: float = 0.95, baseline: bool = False - ) -> PrivacyRisk: + def risk(self, confidence_level: float = 0.95, baseline: bool = False) -> PrivacyRisk: """Estimate the singling out risk. The risk is estimated comparing the number of successfull singling out diff --git a/tests/test_singling_out_evaluator.py b/tests/test_singling_out_evaluator.py index ced713d..05a79cb 100644 --- a/tests/test_singling_out_evaluator.py +++ b/tests/test_singling_out_evaluator.py @@ -181,3 +181,22 @@ def test_so_evaluator_max_attempts(max_attempts: int) -> None: soe.evaluate(mode="multivariate") assert len(soe.queries()) <= max_attempts + + +@pytest.mark.parametrize("mode", ["univariate", "multivariate"]) +def test_so_weird_column_names(mode: str) -> None: + ori = pd.DataFrame( + { + "capital-gain": [100321.23, -2341.2, 4552.343], + "hr/week": [32, 48, 38], + "datetime": ["11:52", "06:00", "11:11"], + } + ) + + soe = SinglingOutEvaluator( + ori=ori, + syn=ori, + n_attacks=3, + n_cols=3, + ) + soe.evaluate(mode=mode)