-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEqualGroupsKMeans.py
168 lines (139 loc) · 6.34 KB
/
EqualGroupsKMeans.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""Equal Groups K-Means clustering utlizing the scikit-learn api and related
utilities.
BSD 3-clause "New" or "Revised" License
version 0.17.1
"""
#****************************************************************************************************
# The original version of the code is available at this address: *
# *
# https://github.com/ndanielsen/Same-Size-K-Means?tab=BSD-3-Clause-1-ov-file#readme *
# *
# This file has been changed and updated according to the need. *
#****************************************************************************************************
import warnings
import numpy as np
import scipy.sparse as sp
from sklearn.base import BaseEstimator, ClusterMixin, TransformerMixin
from sklearn.cluster import KMeans
from joblib import Parallel, delayed
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.utils.validation import check_is_fitted, FLOAT_DTYPES
from sklearn.utils import check_array, check_random_state
import numpy as np
import scipy.sparse as sp
class EqualGroupsKMeans(BaseEstimator, ClusterMixin, TransformerMixin):
def __init__(self, n_clusters=8, init='k-means++', n_init=10, max_iter=300,
tol=1e-4, precompute_distances='auto', verbose=0,
random_state=None, copy_x=True, n_jobs=1):
self.n_clusters = n_clusters
self.init = init
self.max_iter = max_iter
self.tol = tol
self.precompute_distances = precompute_distances
self.n_init = n_init
self.verbose = verbose
self.random_state = random_state
self.copy_x = copy_x
self.n_jobs = n_jobs
self.cluster_centers_ = None
self.labels_ = None
self.inertia_ = None
self.n_iter_ = None
def _check_fit_data(self, X):
"""Verify that the number of samples given is larger than k"""
X = check_array(X, accept_sparse='csr', dtype=np.float64)
if X.shape[0] < self.n_clusters:
raise ValueError("n_samples=%d should be >= n_clusters=%d" % (
X.shape[0], self.n_clusters))
return X
def _check_test_data(self, X):
X = check_array(X, accept_sparse='csr', dtype=FLOAT_DTYPES,
warn_on_dtype=True)
n_samples, n_features = X.shape
expected_n_features = self.cluster_centers_.shape[1]
if not n_features == expected_n_features:
raise ValueError("Incorrect number of features. "
"Got %d features, expected %d" % (
n_features, expected_n_features))
return X
def fit(self, X, y=None):
random_state = check_random_state(self.random_state)
X = check_array(X, accept_sparse='csr', dtype=np.float64)
kmeans = KMeans(n_clusters=self.n_clusters, init=self.init, n_init=self.n_init,
max_iter=self.max_iter, tol=self.tol, verbose=self.verbose,
random_state=random_state, copy_x=self.copy_x)
kmeans.fit(X)
self.cluster_centers_ = kmeans.cluster_centers_
self.labels_ = kmeans.labels_
self.inertia_ = kmeans.inertia_
self.n_iter_ = kmeans.n_iter_
return self
def fit_predict(self, X, y=None):
"""Compute cluster centers and predict cluster index for each sample.
Convenience method; equivalent to calling fit(X) followed by
predict(X).
"""
return self.fit(X).labels_
def fit_transform(self, X, y=None):
"""Compute clustering and transform X to cluster-distance space.
Equivalent to fit(X).transform(X), but more efficiently implemented.
"""
# Currently, this just skips a copy of the data if it is not in
# np.array or CSR format already.
# XXX This skips _check_test_data, which may change the dtype;
# we should refactor the input validation.
X = self._check_fit_data(X)
return self.fit(X)._transform(X)
def transform(self, X, y=None):
"""Transform X to a cluster-distance space.
In the new space, each dimension is the distance to the cluster
centers. Note that even if X is sparse, the array returned by
`transform` will typically be dense.
Parameters
----------
X : {array-like, sparse matrix}, shape = [n_samples, n_features]
New data to transform.
Returns
-------
X_new : array, shape [n_samples, k]
X transformed in the new space.
"""
check_is_fitted(self, 'cluster_centers_')
X = self._check_test_data(X)
return self._transform(X)
def _transform(self, X):
"""guts of transform method; no input validation"""
return euclidean_distances(X, self.cluster_centers_)
def predict(self, X):
"""Predict the closest cluster each sample in X belongs to.
In the vector quantization literature, `cluster_centers_` is called
the code book and each value returned by `predict` is the index of
the closest code in the code book.
Parameters
----------
X : {array-like, sparse matrix}, shape = [n_samples, n_features]
New data to predict.
Returns
-------
labels : array, shape [n_samples,]
Index of the cluster each sample belongs to.
"""
check_is_fitted(self, 'cluster_centers_')
X = self._check_test_data(X)
x_squared_norms = row_norms(X, squared=True)
return _labels_inertia(X, x_squared_norms, self.cluster_centers_)[0]
def score(self, X, y=None):
"""Opposite of the value of X on the K-means objective.
Parameters
----------
X : {array-like, sparse matrix}, shape = [n_samples, n_features]
New data.
Returns
-------
score : float
Opposite of the value of X on the K-means objective.
"""
check_is_fitted(self, 'cluster_centers_')
X = self._check_test_data(X)
x_squared_norms = row_norms(X, squared=True)
return -_labels_inertia(X, x_squared_norms, self.cluster_centers_)[1]