-
Notifications
You must be signed in to change notification settings - Fork 2
/
mechanisms.py
227 lines (187 loc) · 7.88 KB
/
mechanisms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import math
import torch
import torch.nn.functional as F
from scipy.special import erf
class Mechanism:
def __init__(self, eps, input_range, **kwargs):
self.eps = eps
self.alpha, self.beta = input_range
def __call__(self, x):
raise NotImplementedError
class Gaussian(Mechanism):
def __init__(self, *args, delta=1e-7, **kwargs):
super().__init__(*args, **kwargs)
self.delta = delta
self.sigma = None
self.sensitivity = None
def __call__(self, x):
len_interval = self.beta - self.alpha
if torch.is_tensor(len_interval) and len(len_interval) > 1:
self.sensitivity = torch.norm(len_interval, p=2)
else:
d = x.size(1)
self.sensitivity = len_interval * math.sqrt(d)
self.sigma = self.calibrate_gaussian_mechanism()
out = torch.normal(mean=x, std=self.sigma)
# out = torch.clip(out, min=self.alpha, max=self.beta)
return out
def calibrate_gaussian_mechanism(self):
return self.sensitivity * math.sqrt(2 * math.log(1.25 / self.delta)) / self.eps
class AnalyticGaussian(Gaussian):
def calibrate_gaussian_mechanism(self, tol=1.e-12):
""" Calibrate a Gaussian perturbation for differential privacy
using the analytic Gaussian mechanism of [Balle and Wang, ICML'18]
Arguments:
tol : error tolerance for binary search (tol > 0)
Output:
sigma : standard deviation of Gaussian noise needed to achieve (epsilon,delta)-DP under global sensitivity GS
"""
delta_thr = self._case_a(0.0)
if self.delta == delta_thr:
alpha = 1.0
else:
if self.delta > delta_thr:
predicate_stop_DT = lambda s: self._case_a(s) >= self.delta
function_s_to_delta = lambda s: self._case_a(s)
predicate_left_BS = lambda s: function_s_to_delta(s) > self.delta
function_s_to_alpha = lambda s: math.sqrt(1.0 + s / 2.0) - math.sqrt(s / 2.0)
else:
predicate_stop_DT = lambda s: self._case_b(s) <= self.delta
function_s_to_delta = lambda s: self._case_b(s)
predicate_left_BS = lambda s: function_s_to_delta(s) < self.delta
function_s_to_alpha = lambda s: math.sqrt(1.0 + s / 2.0) + math.sqrt(s / 2.0)
predicate_stop_BS = lambda s: abs(function_s_to_delta(s) - self.delta) <= tol
s_inf, s_sup = self._doubling_trick(predicate_stop_DT, 0.0, 1.0)
s_final = self._binary_search(predicate_stop_BS, predicate_left_BS, s_inf, s_sup)
alpha = function_s_to_alpha(s_final)
sigma = alpha * self.sensitivity / math.sqrt(2.0 * self.eps)
return sigma
@staticmethod
def _phi(t):
return 0.5 * (1.0 + erf(t / math.sqrt(2.0)))
def _case_a(self, s):
return self._phi(math.sqrt(self.eps * s)) - math.exp(self.eps) * self._phi(-math.sqrt(self.eps * (s + 2.0)))
def _case_b(self, s):
return self._phi(-math.sqrt(self.eps * s)) - math.exp(self.eps) * self._phi(-math.sqrt(self.eps * (s + 2.0)))
@staticmethod
def _doubling_trick(predicate_stop, s_inf, s_sup):
while not predicate_stop(s_sup):
s_inf = s_sup
s_sup = 2.0 * s_inf
return s_inf, s_sup
@staticmethod
def _binary_search(predicate_stop, predicate_left, s_inf, s_sup):
s_mid = s_inf + (s_sup - s_inf) / 2.0
while not predicate_stop(s_mid):
if predicate_left(s_mid):
s_sup = s_mid
else:
s_inf = s_mid
s_mid = s_inf + (s_sup - s_inf) / 2.0
return s_mid
class Piecewise(Mechanism):
def __call__(self, x):
# normalize x between -1,1
t = (x - self.alpha) / (self.beta - self.alpha)
t = 2 * t - 1
# piecewise mechanism's variables
P = (math.exp(self.eps) - math.exp(self.eps / 2)) / (2 * math.exp(self.eps / 2) + 2)
C = (math.exp(self.eps / 2) + 1) / (math.exp(self.eps / 2) - 1)
L = t * (C + 1) / 2 - (C - 1) / 2
R = L + C - 1
# thresholds for random sampling
threshold_left = P * (L + C) / math.exp(self.eps)
threshold_right = threshold_left + P * (R - L)
# masks for piecewise random sampling
x = torch.rand_like(t)
mask_left = x < threshold_left
mask_middle = (threshold_left < x) & (x < threshold_right)
mask_right = threshold_right < x
# random sampling
t = mask_left * (torch.rand_like(t) * (L + C) - C)
t += mask_middle * (torch.rand_like(t) * (R - L) + L)
t += mask_right * (torch.rand_like(t) * (C - R) + R)
# unbias data
x_prime = (self.beta - self.alpha) * (t + 1) / 2 + self.alpha
return x_prime
class MultiDimPiecewise(Piecewise):
def __call__(self, x):
n, d = x.size()
k = int(max(1, min(d, math.floor(self.eps / 2.5))))
sample = torch.rand_like(x).topk(k, dim=1).indices
mask = torch.zeros_like(x, dtype=torch.bool)
mask.scatter_(1, sample, True)
self.eps /= k
y = super().__call__(x)
z = mask * y * d / k
return z
class RandomizedResponseTopK:
def __init__(self, eps, k):
"""
Args:
eps: privacy budget parameter epsilon
k: limits choices to top-k items having the highest probabilities
"""
self.eps = eps
self.k = k
def __call__(self, y, p):
"""
Perturbs input vector y with prior probabilities indicated by p
Args:
y: tensor with shape (N,1) with one private label per row
p: tensor with shape (N,D) with one row per label incdicating the prior distribution
over D possible outcomes
Returns:
perturbed labels
"""
# add a small random noise to break ties randomly
p += torch.randn_like(p) / 1e4
# select top-k items
kth_max = p.sort(dim=1, descending=True).values.gather(1, self.k - 1)
included = p >= kth_max
# apply randomized response on top-k items
p_incorrect = 1.0 / (math.exp(self.eps) + self.k - 1)
pr = torch.ones_like(p) * p_incorrect
y = y.unsqueeze(dim=1) if len(y.size()) == 1 else y
pr.scatter_(1, y, math.exp(self.eps) * p_incorrect)
pr.mul_(included)
pr.mul_(1.0 / pr.sum(dim=1).unsqueeze(1))
return torch.multinomial(pr, num_samples=1).squeeze()
class RandomizedResponseWithPrior:
def __init__(self, eps):
"""
Args:
eps: privacy budget parameter epsilon
"""
self.eps = eps
def __call__(self, y, p):
"""
Perturbs input vector y with prior probabilities indicated by p
Args:
y: tensor with shape (N,1) with one private label per row
p: tensor with shape (N,D) with one row per label incdicating the prior distribution
over D possible outcomes
Returns:
perturbed labels
"""
# add a small random noise to break ties randomly
p_noisy = p + (torch.randn_like(p) / 1e4)
# select best value of k for RRTop-K
n, d = p.size()
k = torch.tensor([range(d)] * n, dtype=float, device=y.device) + 1
p_noisy = p_noisy.sort(dim=1, descending=True).values
w = math.exp(self.eps) / (math.exp(self.eps) + k - 1)
w.mul_(p_noisy.cumsum(dim=1))
k = w.argmax(dim=1, keepdim=True) + 1
return RandomizedResponseTopK(eps=self.eps, k=k)(y, p)
class OptimizedUnaryEncoding:
def __init__(self, eps, d):
self.d = d
self.p = 0.5
self.q = 1 / (math.exp(eps) + 1)
def __call__(self, y):
pr = y * self.p + (1 - y) * self.q
return torch.bernoulli(pr)
def estimate(self, b):
n = b.size(0)
return (b.sum(dim=0) / n - self.q) / (self.p - self.q)