Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ dmypy.json
# Pyre type checker
.pyre/

# vscode
# IDE
.vscode/
.idea/

**/*DS_Store*
**/*DS_Store*
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,3 @@ This `bibtex` entry can be used to refer to the paper:
### License

Licensed under Clear BSD License, see `LICENSE.md` to see the full license text. Patent-pending code (application US-20230401336-A1).

144 changes: 119 additions & 25 deletions src/anonymeter/evaluators/inference_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import numpy.typing as npt
import pandas as pd

from anonymeter.neighbors.mixed_types_kneighbors import MixedTypeKNeighbors
from anonymeter.evaluators.inference_predictor import InferencePredictor
from anonymeter.neighbors.mixed_types_kneighbors import KNNInferencePredictor
from anonymeter.stats.confidence import EvaluationResults, PrivacyRisk


Expand All @@ -22,25 +23,26 @@ def _run_attack(
n_jobs: int,
naive: bool,
regression: Optional[bool],
) -> int:
inference_model: Optional[InferencePredictor],
) -> tuple[int, pd.Series]:
if regression is None:
regression = pd.api.types.is_numeric_dtype(target[secret])

targets = target.sample(n_attacks, replace=False)

if naive:
guesses = syn.sample(n_attacks)[secret]

guesses.index = targets.index
else:
nn = MixedTypeKNeighbors(n_jobs=n_jobs, n_neighbors=1).fit(candidates=syn[aux_cols])
# Instantiate the default KNN model if no other model is passed through `inference_model`.
if inference_model is None:
inference_model = KNNInferencePredictor(data=syn, columns=aux_cols, target_col=secret, n_jobs=n_jobs)

guesses_idx = nn.kneighbors(queries=targets[aux_cols])
if isinstance(guesses_idx, tuple):
raise RuntimeError("guesses_idx cannot be a tuple")
guesses = inference_model.predict(targets)

guesses = syn.iloc[guesses_idx.flatten()][secret]
if not guesses.index.equals(targets.index):
raise RuntimeError("The predictions indices do not match the target indices. Check your inference model.")

return evaluate_inference_guesses(guesses=guesses, secrets=targets[secret], regression=regression).sum()
return evaluate_inference_guesses(guesses=guesses, secrets=targets[secret], regression=regression).sum(), guesses


def evaluate_inference_guesses(
Expand Down Expand Up @@ -142,23 +144,35 @@ class InferenceEvaluator:
the variable.
n_attacks : int, default is 500
Number of attack attempts.
inference_model: InferencePredictor
An ml model fitted on `syn` as training data, and `secret` as target, that supports ::predict(x).
If not None, it will be used over the MixedTypeKNeighbors in the attack.

"""

def __init__(
self,
ori: pd.DataFrame,
syn: pd.DataFrame,
aux_cols: list[str],
secret: str,
regression: Optional[bool] = None,
n_attacks: int = 500,
control: Optional[pd.DataFrame] = None,
self,
ori: pd.DataFrame,
syn: pd.DataFrame,
aux_cols: list[str],
secret: str,
regression: bool = False,
n_attacks: int = 500,
control: Optional[pd.DataFrame] = None,
inference_model: Optional[InferencePredictor] = None
):
self._ori = ori
self._syn = syn
self._control = control
self._n_attacks = n_attacks
self._inference_model = inference_model

self._n_attacks_ori = min(n_attacks, self._ori.shape[0])
self._n_attacks_baseline = min(self._syn.shape[0], self._n_attacks_ori)
if self._control is None:
self._n_attacks_control = -1
else:
self._n_attacks_control = min(n_attacks, self._control.shape[0])

# check if secret is a string column
if not isinstance(secret, str):
Expand All @@ -173,16 +187,17 @@ def __init__(
self._aux_cols = aux_cols
self._evaluated = False

def _attack(self, target: pd.DataFrame, naive: bool, n_jobs: int) -> int:
def _attack(self, target: pd.DataFrame, naive: bool, n_jobs: int, n_attacks: int) -> tuple[int, pd.Series]:
return _run_attack(
target=target,
syn=self._syn,
n_attacks=self._n_attacks,
n_attacks=n_attacks,
aux_cols=self._aux_cols,
secret=self._secret,
n_jobs=n_jobs,
naive=naive,
regression=self._regression,
inference_model=self._inference_model,
)

def evaluate(self, n_jobs: int = -2) -> "InferenceEvaluator":
Expand All @@ -199,10 +214,20 @@ def evaluate(self, n_jobs: int = -2) -> "InferenceEvaluator":
The evaluated ``InferenceEvaluator`` object.

"""
self._n_baseline = self._attack(target=self._ori, naive=True, n_jobs=n_jobs)
self._n_success = self._attack(target=self._ori, naive=False, n_jobs=n_jobs)
self._n_control = (
None if self._control is None else self._attack(target=self._control, naive=False, n_jobs=n_jobs)
# n_attacks is effective here
self._n_baseline, self._guesses_baseline = self._attack(
target=self._ori, naive=True, n_jobs=n_jobs, n_attacks=self._n_attacks_baseline
)

# n_attacks is not effective here, just needed for the baseline
self._n_success, self._guesses_success = self._attack(
target=self._ori, naive=False, n_jobs=n_jobs, n_attacks=self._n_attacks_ori
)
# n_attacks is not effective here, just needed for the baseline
self._n_control, self._guesses_control = (
(None, None)
if self._control is None
else self._attack(target=self._control, naive=False, n_jobs=n_jobs, n_attacks=self._n_attacks_control)
)

self._evaluated = True
Expand All @@ -226,7 +251,7 @@ def results(self, confidence_level: float = 0.95) -> EvaluationResults:
raise RuntimeError("The inference evaluator wasn't evaluated yet. Please, run `evaluate()` first.")

return EvaluationResults(
n_attacks=self._n_attacks,
n_attacks=(self._n_attacks_ori, self._n_attacks_baseline, self._n_attacks_control),
n_success=self._n_success,
n_baseline=self._n_baseline,
n_control=self._n_control,
Expand Down Expand Up @@ -258,3 +283,72 @@ def risk(self, confidence_level: float = 0.95, baseline: bool = False) -> Privac
"""
results = self.results(confidence_level=confidence_level)
return results.risk(baseline=baseline)

def risk_for_groups(self, confidence_level: float = 0.95) -> dict[str, tuple[EvaluationResults, PrivacyRisk]]:
"""Compute the attack risks on a group level, for every unique value of `self._data_groups`.

Parameters
----------
confidence_level : float, default is 0.95
Confidence level for the error bound calculation.

Returns
-------
dict[str, tuple[EvaluationResults | PrivacyRisk]
The group as a key, and then for every group the results (EvaluationResults),
and the risks (PrivacyRisk) as a tuple.

"""
if not self._evaluated:
self.evaluate(n_jobs=-2)

all_results = {}

# For every unique group in `self._data_groups`
for group, data_ori in self._ori.groupby(self._secret):
# Get the targets for the current group
common_indices = data_ori.index.intersection(self._guesses_success.index)
# Get the guesses for the current group
data_ori = data_ori.loc[common_indices]
n_attacks_ori = len(data_ori)

# Count the number of success attacks
n_success = evaluate_inference_guesses(
guesses=self._guesses_success.loc[common_indices],
secrets=data_ori[self._secret],
regression=self._regression,
).sum()

if self._control is not None:
# Get the targets for the current control group
data_control = self._control[self._control[self._secret] == group]
n_attacks_control = len(data_control)

# Get the guesses for the current control group
common_indices = data_control.index.intersection(self._guesses_control.index)

# Count the number of success control attacks
n_control = evaluate_inference_guesses(
guesses=self._guesses_control.loc[common_indices],
secrets=data_control[self._secret],
regression=self._regression,
).sum()
else:
n_control = None
n_attacks_control = -1

# Recreate the EvaluationResults for the current group
assert n_attacks_ori == n_success
results = EvaluationResults(
n_attacks=(n_attacks_ori, self._n_attacks_baseline, n_attacks_control),
n_success=n_success,
n_baseline=self._n_baseline, # The baseline risk should be the same independent of the group
n_control=n_control,
confidence_level=confidence_level,
)
# Compute the risk
risk = results.risk()

all_results[group] = (results, risk)

return all_results
8 changes: 8 additions & 0 deletions src/anonymeter/evaluators/inference_predictor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import Protocol

import pandas as pd


class InferencePredictor(Protocol):
def predict(self, X: pd.DataFrame) -> pd.Series:
...
39 changes: 39 additions & 0 deletions src/anonymeter/evaluators/sklearn_inference_predictor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pandas as pd
from sklearn.base import BaseEstimator, is_classifier, is_regressor

from anonymeter.evaluators.inference_predictor import InferencePredictor


class SklearnInferencePredictor(InferencePredictor):
"""Wrapper class to use sklearn methods in the inference evaluator.

Parameters
----------
model : sklearn.base.BaseEstimator
A classifier or regressor which implements ::predict().

"""

def __init__(self, model: BaseEstimator):
if not (is_classifier(estimator=model) or is_regressor(estimator=model)):
raise ValueError("Model must be classifier or regressor %s", model)
if not hasattr(model, "predict"):
raise ValueError("Model must have a predict method, %s", model)
self._model = model

def predict(self, x: pd.DataFrame) -> pd.Series:
"""Predict the targets for input `x`.

Parameters
----------
x : pd.DataFrame
The input data to predict.

Returns
-------
pd.Series
The predictions as pd.Series.

"""
prediction = self._model.predict(x)
return pd.Series(prediction, index=x.index)
50 changes: 47 additions & 3 deletions src/anonymeter/neighbors/mixed_types_kneighbors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from joblib import Parallel, delayed
from numba import jit

from anonymeter.evaluators.inference_predictor import InferencePredictor
from anonymeter.preprocessing.transformations import mixed_types_transform
from anonymeter.preprocessing.type_detection import detect_consistent_col_types

Expand Down Expand Up @@ -75,7 +76,7 @@ def gower_distance(r0: npt.NDArray, r1: npt.NDArray, cat_cols_index: int) -> flo

@jit(nopython=True, nogil=True)
def _nearest_neighbors(
queries: npt.NDArray, candidates: npt.NDArray, cat_cols_index: int, n_neighbors: int
queries: npt.NDArray, candidates: npt.NDArray, cat_cols_index: int, n_neighbors: int
) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.float64]]:
r"""For every element of ``queries``, find its nearest neighbors in ``candidates``.

Expand Down Expand Up @@ -166,7 +167,7 @@ def fit(self, candidates: pd.DataFrame, ctypes: Optional[dict[str, list[str]]] =
return self

def kneighbors(
self, queries: pd.DataFrame, n_neighbors: Optional[int] = None, return_distance: bool = False
self, queries: pd.DataFrame, n_neighbors: Optional[int] = None, return_distance: bool = False
) -> Union[tuple[npt.NDArray, npt.NDArray], npt.NDArray]:
"""Find the nearest neighbors for a set of query points.

Expand Down Expand Up @@ -220,7 +221,7 @@ def kneighbors(
with Parallel(n_jobs=self._n_jobs, backend="threading") as executor:
res = executor(
delayed(_nearest_neighbors)(
queries=queries[ii : ii + 1],
queries=queries[ii: ii + 1],
candidates=candidates,
cat_cols_index=len(self._ctypes["num"]),
n_neighbors=n_neighbors,
Expand All @@ -235,3 +236,46 @@ def kneighbors(
return distances, indexes

return indexes


class KNNInferencePredictor(InferencePredictor):
"""Wrapper class to use MixedTypeKNeighbors in the inference evaluator.

Parameters
----------
data : pd.DataFrame
The train data to fit the model on (usually the synthetic data).
columns : list[str]
The auxiliary columns of `data`, used as input to the model.
target_col : str
The target column of `data`.
n_jobs : int, default is -2
Number of jobs to use. It follows joblib convention, so that ``n_jobs = -1``
means all available cores

"""

def __init__(self, data: pd.DataFrame, columns: list[str], target_col: str, n_jobs: int):
self._nn = MixedTypeKNeighbors(n_jobs=n_jobs, n_neighbors=1).fit(candidates=data[columns])
self._data = data
self._target_col = target_col
self._columns = columns

def predict(self, x: pd.DataFrame) -> pd.Series:
"""Predict the targets for input `x`.

Parameters
----------
x : pd.DataFrame
The input data to predict.

Returns
-------
pd.Series
The predictions as pd.Series.

"""
guesses_idx = self._nn.kneighbors(queries=x[self._columns])
if isinstance(guesses_idx, tuple):
raise RuntimeError("guesses_idx cannot be a tuple")
return self._data.iloc[guesses_idx.flatten()][self._target_col]
Loading