Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 68 additions & 92 deletions src/anonymeter/evaluators/singling_out_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import operator
from collections.abc import Sequence
from functools import reduce
from keyword import iskeyword
from typing import Any, Callable, Optional, Union, cast

import numpy as np
Expand All @@ -20,6 +21,40 @@
logger = logging.getLogger(__name__)


def _safe_column_names(df: pd.DataFrame) -> pd.DataFrame:
"""Modify column names in dataframes so that we can use it to build queries.

Mathematical symbols like `-` or other python keywords (or 'datetime')
in column names are replaced.

Parameters
----------
df : pd.DataFrame
Input dataframe

Returns
-------
pd.DataFrame
Dataframe with safe column names

"""
symbols = ["-", "*", "/", "+"]
replace_with = "_"
replacements = {}
for old_column in df.columns:
new_column = old_column
for symbol in symbols:
if symbol in new_column:
new_column = new_column.replace(symbol, replace_with)

if iskeyword(new_column) or new_column == "datetime":
old_column = "_anonymeter_" + new_column

replacements[old_column] = old_column

return df.rename(columns=replacements)


def _escape_quotes(string: str) -> str:
return string.replace('"', '\\"').replace("'", "\\'")

Expand Down Expand Up @@ -66,15 +101,14 @@ def _query_from_record(
expr = reduce(operator.and_, expr_components)
return expr


def _operator_choice(
operators: Sequence[Callable[[Any, Any], bool]],
rng: np.random.Generator
operators: Sequence[Callable[[Any, Any], bool]], rng: np.random.Generator
) -> Callable[[Any, Any], bool]:
return rng.choice(operators) #type: ignore[arg-type] # signature of "choice" does not accept a list of callables but works fine in practice
return rng.choice(operators) # type: ignore[arg-type] # signature of "choice" does not accept a list of callables but works fine in practice


def _random_operator(
data_type: str, rng: np.random.Generator
) -> Callable[[Any, Any], Union[bool, pl.Expr]]:
def _random_operator(data_type: str, rng: np.random.Generator) -> Callable[[Any, Any], Union[bool, pl.Expr]]:
if data_type in ["categorical", "boolean"]:
ops: Sequence[Callable[[Any, Any], bool]] = [operator.eq, operator.ne]
elif data_type == "numerical":
Expand Down Expand Up @@ -143,32 +177,20 @@ def _random_queries(
rng: np.random.Generator,
) -> list[pl.Expr]:
unique_values = {col: df[col].unique().to_list() for col in df.columns}
column_types = {
col: _convert_polars_dtype(df[col].dtype)
for col in df.columns
}
column_types = {col: _convert_polars_dtype(df[col].dtype) for col in df.columns}

queries = []
for _ in range(n_queries):
selected_cols = rng.choice(
df.columns, size=n_cols, replace=False
).tolist()
selected_cols = rng.choice(df.columns, size=n_cols, replace=False).tolist()

queries.append(
_random_query(
unique_values=unique_values,
cols=selected_cols,
column_types=column_types,
rng=rng
)
_random_query(unique_values=unique_values, cols=selected_cols, column_types=column_types, rng=rng)
)

return queries


def singling_out_probability_integral(
n: int, w_min: float, w_max: float
) -> float:
def singling_out_probability_integral(n: int, w_min: float, w_max: float) -> float:
"""Integral of the singling out probability within a given range.

The probability that a query singles out in a population of size
Expand Down Expand Up @@ -198,18 +220,14 @@ def singling_out_probability_integral(

"""
if w_min < 0 or w_min > 1:
raise ValueError(
f"Parameter `w_min` must be between 0 and 1. Got {w_min} instead."
)
raise ValueError(f"Parameter `w_min` must be between 0 and 1. Got {w_min} instead.")

if w_max < w_min or w_max > 1:
raise ValueError(
f"Parameter `w_max` must be greater than w_min ({w_min}) and smaller than 1. Got {w_max} instead."
)

return (
(n * w_min + 1) * (1 - w_min) ** n - (n * w_max + 1) * (1 - w_max) ** n
) / (n + 1)
return ((n * w_min + 1) * (1 - w_min) ** n - (n * w_max + 1) * (1 - w_max) ** n) / (n + 1)


def _measure_queries_success(
Expand All @@ -233,9 +251,7 @@ def _model(x, w_eff, norm):
def _fit_model(sizes: npt.NDArray, successes: npt.NDArray) -> Callable:
# initial guesses
w_eff_guess = 1 / np.max(sizes)
norm_guess = 1 / singling_out_probability_integral(
n=np.max(sizes), w_min=0, w_max=w_eff_guess
)
norm_guess = 1 / singling_out_probability_integral(n=np.max(sizes), w_min=0, w_max=w_eff_guess)

popt, _ = curve_fit(
_model,
Expand Down Expand Up @@ -265,9 +281,7 @@ def fit_correction_term(df: pl.DataFrame, queries: list[pl.Expr]) -> Callable:
depends on the size of the dataset.

"""
sizes, successes = _measure_queries_success(
df=df, queries=queries, n_repeat=5, n_meas=10
)
sizes, successes = _measure_queries_success(df=df, queries=queries, n_repeat=5, n_meas=10)
return _fit_model(sizes=sizes, successes=successes)


Expand Down Expand Up @@ -323,9 +337,7 @@ def queries(self) -> list[pl.Expr]:
return self._list


def univariate_singling_out_queries(
df: pl.DataFrame, n_queries: int, rng: np.random.Generator
) -> list[pl.Expr]:
def univariate_singling_out_queries(df: pl.DataFrame, n_queries: int, rng: np.random.Generator) -> list[pl.Expr]:
"""Generate singling out queries from rare attributes.

Parameters
Expand Down Expand Up @@ -374,7 +386,7 @@ def univariate_singling_out_queries(
if len(rare_values) > 0:
queries.extend([pl.col(col) == val for val in rare_values])

rng.shuffle(queries) #type: ignore[arg-type] # signature of "shuffle" does not accept a list of expressions but works fine in practice
rng.shuffle(queries) # type: ignore[arg-type] # signature of "shuffle" does not accept a list of expressions but works fine in practice

unique_so_queries = UniqueSinglingOutQueries(max_size=n_queries)
unique_so_queries.check_and_extend(queries, df)
Expand Down Expand Up @@ -444,18 +456,13 @@ def multivariate_singling_out_queries(
# Generate a batch of queries

# Pre-sample all random row indices
random_indices = rng.integers(
low=0, high=df.shape[0], size=batch_size
)
random_indices = rng.integers(low=0, high=df.shape[0], size=batch_size)

# Extract all records in bulk
records = df[random_indices].to_dicts()

# Pre-sample all column choices
selected_columns = [
rng.choice(df.columns, size=n_cols, replace=False).tolist()
for _ in range(batch_size)
]
selected_columns = [rng.choice(df.columns, size=n_cols, replace=False).tolist() for _ in range(batch_size)]

queries_batch = [
_query_from_record(
Expand All @@ -478,25 +485,16 @@ def multivariate_singling_out_queries(
return unique_so_queries.queries


def _evaluate_queries(
df: pl.DataFrame, queries: list[pl.Expr]
) -> tuple[int, ...]:
def _evaluate_queries(df: pl.DataFrame, queries: list[pl.Expr]) -> tuple[int, ...]:
if len(queries) == 0:
return ()

result_df = df.select(
[
q.cast(pl.Int64).sum().alias(f"count_{i}")
for i, q in enumerate(queries)
]
)
result_df = df.select([q.cast(pl.Int64).sum().alias(f"count_{i}") for i, q in enumerate(queries)])
counts = result_df.row(0)
return counts


def _evaluate_queries_and_return_successful(
df: pl.DataFrame, queries: list[pl.Expr]
) -> list[pl.Expr]:
def _evaluate_queries_and_return_successful(df: pl.DataFrame, queries: list[pl.Expr]) -> list[pl.Expr]:
counts = _evaluate_queries(df=df, queries=queries)

counts_np = np.array(counts, dtype=float)
Expand All @@ -520,9 +518,7 @@ def _generate_singling_out_queries(
rng: np.random.Generator,
) -> list[pl.Expr]:
if mode == "univariate":
queries = univariate_singling_out_queries(
df=df, n_queries=n_attacks, rng=rng
)
queries = univariate_singling_out_queries(df=df, n_queries=n_attacks, rng=rng)

elif mode == "multivariate":
queries = multivariate_singling_out_queries(
Expand All @@ -534,9 +530,7 @@ def _generate_singling_out_queries(
)

else:
raise RuntimeError(
f"Parameter `mode` can be either `univariate` or `multivariate`. Got {mode} instead."
)
raise RuntimeError(f"Parameter `mode` can be either `univariate` or `multivariate`. Got {mode} instead.")

if len(queries) < n_attacks:
logger.warning(
Expand Down Expand Up @@ -604,16 +598,16 @@ def __init__(
max_attempts: Optional[int] = 10000000,
seed: Optional[int] = None,
):
ori = pl.DataFrame(ori)
syn = pl.DataFrame(syn)
ori = pl.DataFrame(_safe_column_names(ori))
syn = pl.DataFrame(_safe_column_names(syn))
self._ori = ori.unique(maintain_order=True)
self._syn = syn.unique(maintain_order=True)
self._n_attacks = n_attacks
self._n_cols = n_cols
if control is None:
self._control = None
else:
control = pl.DataFrame(control)
control = pl.DataFrame(_safe_column_names(control))
self._control = control.unique(maintain_order=True)
self._max_attempts = max_attempts
self._queries: list[pl.Expr] = []
Expand Down Expand Up @@ -659,9 +653,7 @@ def evaluate(self, mode: str = "multivariate") -> "SinglingOutEvaluator":
elif mode == "univariate":
n_cols = 1
else:
raise ValueError(
f"mode must be either 'multivariate' or 'univariate', got {mode} instead."
)
raise ValueError(f"mode must be either 'multivariate' or 'univariate', got {mode} instead.")

queries = _generate_singling_out_queries(
df=self._syn,
Expand All @@ -671,9 +663,7 @@ def evaluate(self, mode: str = "multivariate") -> "SinglingOutEvaluator":
max_attempts=self._max_attempts,
rng=self._rng,
)
self._queries = _evaluate_queries_and_return_successful(
df=self._ori, queries=queries
)
self._queries = _evaluate_queries_and_return_successful(df=self._ori, queries=queries)
self._n_success = len(self._queries)

baseline_queries = _random_queries(
Expand All @@ -682,31 +672,21 @@ def evaluate(self, mode: str = "multivariate") -> "SinglingOutEvaluator":
n_cols=n_cols,
rng=self._rng,
)
self._baseline_queries = _evaluate_queries_and_return_successful(
df=self._ori, queries=baseline_queries
)
self._baseline_queries = _evaluate_queries_and_return_successful(df=self._ori, queries=baseline_queries)
self._n_baseline = len(self._baseline_queries)

if self._control is None:
self._n_control = None
else:
self._n_control = len(
_evaluate_queries_and_return_successful(
df=self._control, queries=queries
)
)
self._n_control = len(_evaluate_queries_and_return_successful(df=self._control, queries=queries))

# correct the number of success against the control set
# to account for different dataset sizes.
if len(self._control) != len(self._ori):
# fit the model to the data:
fitted_model = fit_correction_term(
df=self._control, queries=queries
)
fitted_model = fit_correction_term(df=self._control, queries=queries)

correction = fitted_model(len(self._ori)) / fitted_model(
len(self._control)
)
correction = fitted_model(len(self._ori)) / fitted_model(len(self._control))
self._n_control *= correction

self._evaluated = True
Expand All @@ -727,9 +707,7 @@ def results(self, confidence_level: float = 0.95) -> EvaluationResults:

"""
if not self._evaluated:
raise RuntimeError(
"The singling out evaluator wasn't evaluated yet. Please, run `evaluate()` first."
)
raise RuntimeError("The singling out evaluator wasn't evaluated yet. Please, run `evaluate()` first.")

return EvaluationResults(
n_attacks=self._n_attacks,
Expand All @@ -739,9 +717,7 @@ def results(self, confidence_level: float = 0.95) -> EvaluationResults:
confidence_level=confidence_level,
)

def risk(
self, confidence_level: float = 0.95, baseline: bool = False
) -> PrivacyRisk:
def risk(self, confidence_level: float = 0.95, baseline: bool = False) -> PrivacyRisk:
"""Estimate the singling out risk.

The risk is estimated comparing the number of successfull singling out
Expand Down
Loading