-
Notifications
You must be signed in to change notification settings - Fork 2
/
blpkm_cc.py
140 lines (100 loc) · 3.98 KB
/
blpkm_cc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# © 2021, Universität Bern, Chair of Quantitative Methods, Philipp Baumann
from scipy.spatial.distance import cdist
import gurobipy as gb
import numpy as np
def update_centers(X, centers, n_clusters, labels):
"""Update positions of cluster centers
Args:
X (np.array): feature vectors of objects
centers (np.array): current positions of cluster centers
n_clusters (int): predefined number of clusters
labels (np.array): current cluster assignments of objects
Returns:
np.array: the updated positions of cluster centers
"""
for i in range(n_clusters):
centers[i] = X[labels == i, :].mean(axis=0)
return centers
def assign_objects(X, centers, ml, cl):
"""Assigns objects to clusters
Args:
X (np.array): feature vectors of objects
centers (np.array): current positions of cluster centers
ml (list): must-link pairs as a list of tuples
cl (list): cannot-link pairs as a list of tuples
Returns:
np.array: cluster labels for objects
"""
# Compute model input
n = X.shape[0]
k = centers.shape[0]
distances = cdist(X, centers)
assignments = {(i, j): distances[i, j] for i in range(n) for j in range(k)}
# Create model
m = gb.Model()
# Add binary decision variables
y = m.addVars(assignments, obj=assignments, vtype=gb.GRB.BINARY)
# Add constraints
m.addConstrs(y.sum(i, '*') == 1 for i in range(n))
m.addConstrs(y.sum('*', j) >= 1 for j in range(k))
m.addConstrs(y[i, j] == y[i_, j] for j in range(k) for i, i_ in ml)
m.addConstrs(y[i, j] + y[i_, j] <= 1 for j in range(k) for i, i_ in cl)
# Determine optimal solution
m.optimize()
# Get labels from optimal assignment
labels = np.array([j for i, j in y.keys() if y[i, j].X > 0.5])
return labels
def get_total_distance(X, centers, labels):
"""Computes total distance between objects and cluster centers
Args:
X (np.array): feature vectors of objects
centers (np.array): current positions of cluster centers
labels (np.array): current cluster assignments of objects
Returns:
float: total distance
"""
dist = np.sqrt(((X - centers[labels, :]) ** 2).sum(axis=1)).sum()
return dist
def blpkm_cc(X, n_clusters, ml=[], cl=[], random_state=None, max_iter=100):
"""Finds partition of X subject to must-link and cannot-link constraints
Args:
X (np.array): feature vectors of objects
n_clusters (int): predefined number of clusters
ml (list): must-link pairs as a list of tuples
cl (list): cannot-link pairs as a list of tuples
random_state (int, RandomState instance): random state
max_iter (int): maximum number of iterations of blpkm_cc algorithm
Returns:
np.array: cluster labels of objects
"""
# Choose initial cluster centers randomly
np.random.seed(random_state)
center_ids = np.random.choice(np.arange(X.shape[0]), size=n_clusters,
replace=False)
centers = X[center_ids, :]
# Assign objects
labels = assign_objects(X, centers, ml, cl)
# Initialize best labels
best_labels = labels
# Update centers
centers = update_centers(X, centers, n_clusters, labels)
# Compute total distance
best_total_distance = get_total_distance(X, centers, labels)
n_iter = 0
while n_iter < max_iter:
# Assign objects
labels = assign_objects(X, centers, ml, cl)
# Update centers
centers = update_centers(X, centers, n_clusters, labels)
# Compute total distance
total_distance = get_total_distance(X, centers, labels)
# Check stopping criterion
if total_distance >= best_total_distance:
break
else:
# Update best labels and best total distance
best_labels = labels
best_total_distance = total_distance
# Increase iteration counter
n_iter += 1
return best_labels