-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathknn_imputer.py
145 lines (118 loc) · 5.33 KB
/
knn_imputer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import copy
import numpy.ma as ma
from tqdm import tqdm
from constants import *
from sklearn.impute import KNNImputer
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
# Utility function to cast array elements to integers
cast_int = lambda i: int(i)
castFunction = np.vectorize(cast_int)
def find_label(data, labels):
"""
Maps each value in `data` to the nearest label in `labels`.
Parameters:
data (array-like): Input data to be mapped.
labels (list or array-like): List of possible labels.
Returns:
list: New data with values replaced by the nearest labels.
"""
selected = 0
new_data = []
for i in data:
min_distance = float('inf')
for label in labels:
# Compute the distance between the current value and the label
distance = abs(int(i) - label)
if distance < min_distance:
selected = label
min_distance = distance
new_data.append(selected)
return new_data
def tune_knn(x_fold, columns, rng=(1, 31), n_splits=32, shuffle=True, random_state=2021, average='binary'):
"""
Tunes the number of neighbors (k) for KNN Imputer using K-Fold cross-validation.
Parameters:
x_fold (array-like): Input dataset with missing values.
columns (list): List of columns to perform imputation on.
rng (tuple): Range of neighbors (inclusive) to evaluate. Default is (1, 31).
n_splits (int): Number of folds for cross-validation. Default is 32.
shuffle (bool): Whether to shuffle data before splitting. Default is True.
random_state (int): Seed for reproducibility. Default is 2021.
average (str): Metric averaging strategy. Default is 'binary'.
Returns:
list: Mean squared errors for each number of neighbors evaluated.
"""
# Scale the dataset to normalize values
scaler = MinMaxScaler()
# Handle missing values in the dataset
accuracy = []
y_fold = copy.deepcopy(x_fold)
# Fill missing values with the column-wise mean
y_fold = np.where(np.isnan(y_fold), ma.array(y_fold, mask=np.isnan(y_fold)).mean(axis=0), y_fold)
# Cast filled values to integers
y_fold = castFunction(y_fold)
for neighbors in tqdm(range(rng[0], rng[1]), desc="Processing"):
algo = KNNImputer(n_neighbors=neighbors) # Initialize KNN Imputer
temp = []
# Perform K-Fold cross-validation
cv = KFold(n_splits=n_splits, shuffle=shuffle, random_state=random_state)
for column in columns:
for train_index, test_index in cv.split(x_fold):
x_train = copy.deepcopy(x_fold)
x_train = scaler.fit_transform(x_train) # Scale training data
x_train[test_index, [column]] = np.nan # Set test set column values to NaN
x_train = algo.fit_transform(x_train) # Perform KNN Imputation
x_train = scaler.inverse_transform(x_train) # Reverse scaling
x_train = castFunction(x_train) # Cast imputed values to integers
# Compute the mean squared error for imputed values
temp.append(mean_squared_error(x_train[test_index, [column]], y_fold[test_index, [column]]))
accuracy.append(np.array(temp).mean())
return accuracy
class Imputer:
"""
A class for handling missing values using KNN Imputer and performing data cleanup.
Attributes:
df_init (pd.DataFrame): Original dataset with missing values.
df (pd.DataFrame): Dataset after imputation.
"""
def __init__(self, df, k=15):
"""
Initializes the Imputer with a dataset and performs KNN Imputation.
Parameters:
df (pd.DataFrame): Input dataset with missing values.
k (int): Number of neighbors for KNN Imputer. Default is 15.
"""
self.df_init = df
self.df = df.copy()
imputer = KNNImputer(n_neighbors=k)
# Perform KNN Imputation
self.df[self.df.columns] = imputer.fit_transform(self.df)
def nearest_value(self, x, uniqueValues):
"""
Finds the nearest value to `x` from the set of unique values.
Parameters:
x (float): Value to find the nearest match for.
uniqueValues (array): Array of unique values to compare against.
Returns:
float: Nearest value from `uniqueValues`.
"""
diff = np.abs(uniqueValues - x)
return uniqueValues[np.argmin(diff)]
def clean(self, missValCols, dataTypeDict):
"""
Cleans imputed columns based on their data types.
Parameters:
missValCols (dict): Dictionary mapping columns to their count of missing values.
dataTypeDict (dict): Dictionary mapping columns to their data types.
Modifies:
- Columns of the DataFrame based on their data type (e.g., rounding, category matching).
"""
for col in missValCols.keys():
if missValCols[col] > 0:
dataType = dataTypeDict[col]
if dataType == 'category' or dataType == 'binary':
# Map imputed values to the nearest unique value in the original dataset
uniqueValues = np.array(self.df_init[col].unique()[:-1])
self.df[col] = self.df[col].apply(lambda x: self.nearest_value(x, uniqueValues))