Skip to content

Commit

Permalink
split get_calibrated_prob and get_calibrated_probs
Browse files Browse the repository at this point in the history
  • Loading branch information
avalanchesiqi committed Nov 30, 2024
1 parent 2714dfc commit 01779f5
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 274 deletions.
302 changes: 85 additions & 217 deletions icwsm_tutorial/Alternate_Calibration_Curves.ipynb

Large diffs are not rendered by default.

70 changes: 30 additions & 40 deletions pyquantifier/calibration_curve.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,18 @@

class CalibrationCurve:
"""
A calibration curve.
Implementation of a calibration curve.
"""
def __init__(self):
self.num_bin = 100
self.x_axis = np.arange(0.05/self.num_bin, 1, 1/self.num_bin)
self.y_axis = self.get_calibrated_prob(self.x_axis)
self.y_axis = self.get_calibrated_probs(self.x_axis)

def get_calibrated_prob(self, cxs):
def get_calibrated_prob(self, cx):
pass
# def get_calibrated_value(score):
# # find the nearest x_axis value below score (or the first x_axis value)
# indx = np.searchsorted(self.x_axis, score, side='right') - 1
# # Ensure indx is within the valid range
# if indx < 0:
# indx = 0
# return self.y_axis[indx]

# return np.array([get_calibrated_value(score) for score in cxs])
def get_calibrated_probs(self, cxs):
return np.array([self.get_calibrated_prob(cx) for cx in cxs])

def plot(self, **kwds):
ax = kwds.pop('ax', None)
Expand Down Expand Up @@ -83,50 +77,41 @@ def __init__(self, x_axis, y_axis):
self.y_axis = y_axis
self.num_bin = len(self.x_axis)

def get_calibrated_prob(self, cxs):
# print(len(self.x_axis), len(self.y_axis))
# for cx in cxs:
# if np.searchsorted(self.x_axis, cx) > 9:
# print(self.x_axis)
# print(cx, np.searchsorted(self.x_axis, cx), get_bin_idx(cx, len(self.x_axis)))
# return np.array([self.y_axis[np.searchsorted(self.x_axis, score)] for score in cxs])
return np.array([self.y_axis[get_bin_idx(cx, size=self.num_bin)] for cx in cxs])
def get_calibrated_prob(self, cx):
return self.y_axis[get_bin_idx(cx, size=self.num_bin)]


class PiecewiseLinearCalibrationCurve(CalibrationCurve):
def __init__(self, x_axis, y_axis, bin_means):
def __init__(self, x_axis, y_axis, bin_inflections):
self.original_x_axis = x_axis
self.original_y_axis = y_axis
self.original_num_bin = len(self.original_x_axis)
self.original_bin_means = bin_means
self.original_bin_inflections = bin_inflections
super().__init__()

def _get_yval(self, cx):
def get_calibrated_prob(self, cx):
idx = get_bin_idx(cx, size=self.original_num_bin)
current_bin_mean = self.original_bin_means[idx]
current_bin_inflection = self.original_bin_inflections[idx]

if cx > current_bin_mean:
if cx > current_bin_inflection:
next_idx = min(idx + 1, self.original_num_bin - 1)
next_bin_mean = self.original_bin_means[next_idx]
if next_bin_mean == current_bin_mean:
next_bin_inflection = self.original_bin_inflections[next_idx]
if next_bin_inflection == current_bin_inflection:
return self.original_y_axis[idx]
else:
weight = (cx - current_bin_mean) / (next_bin_mean - current_bin_mean)
weight = (cx - current_bin_inflection) / (next_bin_inflection - current_bin_inflection)
return (1 - weight) * self.original_y_axis[idx] + weight * self.original_y_axis[next_idx]
elif cx < current_bin_mean:
elif cx < current_bin_inflection:
prev_idx = max(idx - 1, 0)
prev_bin_mean = self.original_bin_means[prev_idx]
if prev_bin_mean == current_bin_mean:
prev_bin_inflection = self.original_bin_inflections[prev_idx]
if prev_bin_inflection == current_bin_inflection:
return self.original_y_axis[idx]
else:
weight = (cx - prev_bin_mean) / (current_bin_mean - prev_bin_mean)
weight = (cx - prev_bin_inflection) / (current_bin_inflection - prev_bin_inflection)
return (1 - weight) * self.original_y_axis[prev_idx] + weight * self.original_y_axis[idx]
else: # cx == current_bin_mean
else: # cx == current_bin_inflection
return self.original_y_axis[idx]

def get_calibrated_prob(self, cxs):
return np.array([self._get_yval(cx) for cx in cxs])


class PlattScaling(CalibrationCurve):
"""
Expand All @@ -135,9 +120,11 @@ class PlattScaling(CalibrationCurve):
def __init__(self, model):
self.model = model
super().__init__()

def get_calibrated_prob(self, cx):
return self.model.predict_proba(cx.reshape(1, -1))[0, 1]

def get_calibrated_prob(self, cxs):
# print(self.model.predict_proba(cxs.reshape(-1, 1))[:, 1])
def get_calibrated_probs(self, cxs):
return self.model.predict_proba(cxs.reshape(-1, 1))[:, 1]


Expand Down Expand Up @@ -181,7 +168,7 @@ def transform(self, logits):
# Apply temperature scaling
return logits / self.temperature

def get_calibrated_prob(self, X):
def get_calibrated_probs(self, X):
# Apply temperature scaling and softmax or sigmoid
X_neg = 1 - X

Expand All @@ -200,5 +187,8 @@ def __init__(self, model):
self.model = model
super().__init__()

def get_calibrated_prob(self, cxs):
return self.model.predict(cxs)
def get_calibrated_prob(self, cx):
return self.model.predict(np.array(cx))[0]

def get_calibrated_probs(self, cxs):
return self.model.predict(np.array(cxs))
22 changes: 11 additions & 11 deletions pyquantifier/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,22 +338,22 @@ def generate_calibration_curve(self, method='platt scaling', num_bin=10):
model = LogisticRegression(solver='lbfgs', fit_intercept=True, C=c)
acc_scores = []
for train_index, test_index in kf.split(train_CX):
X_train, X_test = train_CX[train_index], train_CX[test_index]
y_train, y_test = train_GT[train_index], train_GT[test_index]
CX_train, CX_test = train_CX[train_index], train_CX[test_index]
GT_train, GT_test = train_GT[train_index], train_GT[test_index]

model.fit(X_train, y_train)
predictions = model.predict(X_test)
acc = accuracy_score(y_test, predictions)
model.fit(CX_train, GT_train)
predictions = model.predict(CX_test)
acc = accuracy_score(GT_test, predictions)
acc_scores.append(acc)

avg_acc = sum(acc_scores) / k
if avg_acc > best_avg_acc:
best_c = c
best_avg_acc = avg_acc
print('Accuracy of each fold:', acc_scores)
print(f'c={c}, avg_acc={avg_acc}')
# print('Accuracy of each fold:', acc_scores)
# print(f'c={c}, avg_acc={avg_acc}')

print(f'best_c={best_c}, best_avg_acc={best_avg_acc}')
# print(f'best_c={best_c}, best_avg_acc={best_avg_acc}')
best_model = LogisticRegression(solver='lbfgs', fit_intercept=True, C=best_c)
best_model.fit(train_CX, train_GT)
return PlattScaling(model=best_model)
Expand Down Expand Up @@ -424,7 +424,7 @@ def nll_loss(logits, labels, temperature):
if method == 'nonparametric binning':
return BinnedCalibrationCurve(x_axis=x_axis, y_axis=y_axis)
elif method == 'mid piecewise linear':
return PiecewiseLinearCalibrationCurve(x_axis=x_axis, y_axis=y_axis, bin_means=x_axis.copy())
return PiecewiseLinearCalibrationCurve(x_axis=x_axis, y_axis=y_axis, bin_inflections=x_axis.copy())
elif method == 'mean piecewise linear':
# Initialize bin_centroids with x_axis values, which are means of the bins
bin_means = x_axis.copy()
Expand All @@ -435,7 +435,7 @@ def nll_loss(logits, labels, temperature):
for bin_idx, mean in df.groupby('bin')['p_pos'].mean().items():
bin_means[bin_idx] = mean

return PiecewiseLinearCalibrationCurve(x_axis=x_axis, y_axis=y_axis, bin_means=bin_means)
return PiecewiseLinearCalibrationCurve(x_axis=x_axis, y_axis=y_axis, bin_inflections=bin_means)

else:
raise ValueError(f'unsupported calibration method, {method}, '
Expand Down Expand Up @@ -566,7 +566,7 @@ def intrinsic_estimate(self, class_conditional_densities: dict, method='mixture
return est_prev

def extrinsic_estimate(self, calibration_curve: CalibrationCurve):
self.df['calibr_pos'] = calibration_curve.get_calibrated_prob(self.df['p_pos'].values)
self.df['calibr_pos'] = calibration_curve.get_calibrated_probs(self.df['p_pos'].values)
return self.df['calibr_pos'].sum() / len(self.df)


Expand Down
8 changes: 4 additions & 4 deletions pyquantifier/distributions.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,10 +661,10 @@ def __init__(self, labels: list,
def calculate_label_distribution(self, num_bin):
x_axis = np.arange(0.5/num_bin, 1, 1/num_bin)
# x_axis = np.linspace(0, 1, num_bin+1)
area_pos = np.nansum(self.calibration_curve.get_calibrated_prob(x_axis) * \
area_pos = np.nansum(self.calibration_curve.get_calibrated_probs(x_axis) * \
np.array([self.classifier_score_distribution.get_density(x)
for x in x_axis]))
area_neg = np.nansum((1 - self.calibration_curve.get_calibrated_prob(x_axis)) * \
area_neg = np.nansum((1 - self.calibration_curve.get_calibrated_probs(x_axis)) * \
np.array([self.classifier_score_distribution.get_density(x)
for x in x_axis]))
total_area = area_pos + area_neg
Expand All @@ -675,10 +675,10 @@ def calculate_label_distribution(self, num_bin):
def calculate_class_conditional_densities(self, num_bin):
x_axis = np.arange(0.5/num_bin, 1, 1/num_bin)
# x_axis = np.linspace(0, 1, num_bin+1)
curve_pos = self.calibration_curve.get_calibrated_prob(x_axis) * \
curve_pos = self.calibration_curve.get_calibrated_probs(x_axis) * \
np.array([self.classifier_score_distribution.get_density(x)
for x in x_axis])
curve_neg = (1 - self.calibration_curve.get_calibrated_prob(x_axis)) * \
curve_neg = (1 - self.calibration_curve.get_calibrated_probs(x_axis)) * \
np.array([self.classifier_score_distribution.get_density(x)
for x in x_axis])
curve_pos = np.array(curve_pos) / sum(curve_pos) * num_bin
Expand Down
2 changes: 1 addition & 1 deletion pyquantifier/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def set_calibration_curve(self, calibration_curve: CalibrationCurve):
self.calibration_curve = calibration_curve

def estimate(self, cx_array):
calibrated_prob_array = self.calibration_curve.get_calibrated_prob(cx_array)
calibrated_prob_array = self.calibration_curve.get_calibrated_probs(cx_array)
return np.mean(calibrated_prob_array)

def plot(self, cx_array, num_bin=100):
Expand Down
2 changes: 1 addition & 1 deletion pyquantifier/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def plot_stacked_frequency(x_axis, freq_hist, calibration_curve, ax=None, fig_na
if ax is None:
ax = prepare_canvas()

cali_prob_array = calibration_curve.get_calibrated_prob(x_axis)
cali_prob_array = calibration_curve.get_calibrated_probs(x_axis)
weighted_freq_hist = cali_prob_array * freq_hist

one_gradient_plot(ax, x_axis, weighted_freq_hist,
Expand Down

0 comments on commit 01779f5

Please sign in to comment.