-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlinear_assignment_numpy.py
247 lines (203 loc) · 8.04 KB
/
linear_assignment_numpy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# coding: utf-8
# Copyright (c) Pymatgen Development Team.
# Distributed under the terms of the MIT License.
from __future__ import division, unicode_literals
"""
This module contains an algorithm to solve the Linear Assignment Problem.
It has the same functionality as linear_assignment.pyx, but is much slower
as it is vectorized in numpy rather than cython
"""
__author__ = "Will Richards"
__copyright__ = "Copyright 2011, The Materials Project"
__version__ = "1.0"
__maintainer__ = "Will Richards"
__email__ = "wrichards@mit.edu"
__date__ = "Jan 28, 2013"
import numpy as np
from six.moves import range
class LinearAssignment(object):
"""
This class finds the solution to the Linear Assignment Problem.
It finds a minimum cost matching between two sets, given a cost
matrix.
This class is an implementation of the LAPJV algorithm described in:
R. Jonker, A. Volgenant. A Shortest Augmenting Path Algorithm for
Dense and Sparse Linear Assignment Problems. Computing 38, 325-340
(1987)
Args:
costs: The cost matrix of the problem. cost[i,j] should be the
cost of matching x[i] to y[j]. The cost matrix may be
rectangular
epsilon: Tolerance for determining if solution vector is < 0
.. attribute: min_cost:
The minimum cost of the matching
.. attribute: solution:
The matching of the rows to columns. i.e solution = [1, 2, 0]
would match row 0 to column 1, row 1 to column 2 and row 2
to column 0. Total cost would be c[0, 1] + c[1, 2] + c[2, 0]
"""
def __init__(self, costs, epsilon=1e-6):
self.orig_c = np.array(costs, dtype=np.float64)
self.nx, self.ny = self.orig_c.shape
self.n = self.ny
self._inds = np.arange(self.n)
self.epsilon = abs(epsilon)
#check that cost matrix is square
if self.nx > self.ny:
raise ValueError("cost matrix must have at least as many columns as rows")
if self.nx == self.ny:
self.c = self.orig_c
else:
# Can run into precision issues if np.max is used as the fill value (since a
# value of this size doesn't necessarily end up in the solution). A value
# at least as large as the maximin is, however, guaranteed to appear so it
# is a safer choice. The fill value is not zero to avoid choosing the extra
# rows in the initial column reduction step
self.c = np.full((self.n, self.n), np.max(np.min(self.orig_c, axis=1)))
self.c[:self.nx] = self.orig_c
#initialize solution vectors
self._x = np.zeros(self.n, dtype=np.int) - 1
self._y = self._x.copy()
#if column reduction doesn't find a solution, augment with shortest
#paths until one is found
if self._column_reduction():
self._augmenting_row_reduction()
#initialize the reduced costs
self._update_cred()
while -1 in self._x:
self._augment()
self.solution = self._x[:self.nx]
self._min_cost = None
@property
def min_cost(self):
"""
Returns the cost of the best assignment
"""
if self._min_cost:
return self._min_cost
self._min_cost = np.sum(self.c[np.arange(self.nx), self.solution])
return self._min_cost
def _column_reduction(self):
"""
Column reduction and reduction transfer steps from LAPJV algorithm
"""
#assign each column to its lowest cost row, ensuring that only row
#or column is assigned once
i1, j = np.unique(np.argmin(self.c, axis=0), return_index=True)
self._x[i1] = j
#if problem is solved, return
if len(i1) == self.n:
return False
self._y[j] = i1
#reduction_transfer
#tempc is array with previously assigned matchings masked
self._v = np.min(self.c, axis=0)
tempc = self.c.copy()
tempc[i1, j] = np.inf
mu = np.min(tempc[i1, :] - self._v[None, :], axis=1)
self._v[j] -= mu
return True
def _augmenting_row_reduction(self):
"""
Augmenting row reduction step from LAPJV algorithm
"""
unassigned = np.where(self._x == -1)[0]
for i in unassigned:
for _ in range(self.c.size):
# Time in this loop can be proportional to 1/epsilon
# This step is not strictly necessary, so cutoff early
# to avoid near-infinite loops
# find smallest 2 values and indices
temp = self.c[i] - self._v
j1 = np.argmin(temp)
u1 = temp[j1]
temp[j1] = np.inf
j2 = np.argmin(temp)
u2 = temp[j2]
if u1 < u2:
self._v[j1] -= u2 - u1
elif self._y[j1] != -1:
j1 = j2
k = self._y[j1]
if k != -1:
self._x[k] = -1
self._x[i] = j1
self._y[j1] = i
i = k
if k == -1 or abs(u1 - u2) < self.epsilon:
break
def _update_cred(self):
"""
Updates the reduced costs with the values from the
dual solution
"""
ui = self.c[self._inds, self._x] - self._v[self._x]
self.cred = self.c - ui[:, None] - self._v[None, :]
def _augment(self):
"""
Finds a minimum cost path and adds it to the matching
"""
#build a minimum cost tree
_pred, _ready, istar, j, mu = self._build_tree()
#update prices
self._v[_ready] += self._d[_ready] - mu
#augment the solution with the minimum cost path from the
#tree. Follows an alternating path along matched, unmatched
#edges from X to Y
while True:
i = _pred[j]
self._y[j] = i
k = j
j = self._x[i]
self._x[i] = k
if i == istar:
break
self._update_cred()
def _build_tree(self):
"""
Builds the tree finding an augmenting path. Alternates along
matched and unmatched edges between X and Y. The paths are
stored in _pred (new predecessor of nodes in Y), and
self._x and self._y
"""
#find unassigned i*
istar = np.argmin(self._x)
#compute distances
self._d = self.c[istar] - self._v
_pred = np.zeros(self.n, dtype=np.int) + istar
#initialize sets
#READY: set of nodes visited and in the path (whose price gets
#updated in augment)
#SCAN: set of nodes at the bottom of the tree, which we need to
#look at
#T0DO: unvisited nodes
_ready = np.zeros(self.n, dtype=np.bool)
_scan = np.zeros(self.n, dtype=np.bool)
_todo = np.zeros(self.n, dtype=np.bool) + True
while True:
#populate scan with minimum reduced distances
if True not in _scan:
mu = np.min(self._d[_todo])
_scan[self._d == mu] = True
_todo[_scan] = False
j = np.argmin(self._y * _scan)
if self._y[j] == -1 and _scan[j]:
return _pred, _ready, istar, j, mu
#pick jstar from scan (scan always has at least 1)
_jstar = np.argmax(_scan)
#pick i associated with jstar
i = self._y[_jstar]
_scan[_jstar] = False
_ready[_jstar] = True
#find shorter distances
newdists = mu + self.cred[i, :]
shorter = np.logical_and(newdists < self._d, _todo)
#update distances
self._d[shorter] = newdists[shorter]
#update predecessors
_pred[shorter] = i
for j in np.nonzero(np.logical_and(self._d == mu, _todo))[0]:
if self._y[j] == -1:
return _pred, _ready, istar, j, mu
_scan[j] = True
_todo[j] = False