forked from ssykiotis/ELECTRIcity_NILM
-
Notifications
You must be signed in to change notification settings - Fork 0
/
metrics.py
109 lines (81 loc) · 3.55 KB
/
metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import torch
import numpy as np
from sklearn.metrics import confusion_matrix
def acc_precision_recall_f1_score(status,status_pred):
assert status.shape == status_pred.shape
if type(status)!=np.ndarray:
status = status.detach().cpu().numpy().squeeze()
if type(status_pred)!=np.ndarray:
status_pred = status_pred.detach().cpu().numpy().squeeze()
status = status.reshape(status.shape[0], -1)
status_pred = status_pred.reshape(status_pred.shape[0],-1)
accs, precisions, recalls, f1_scores = [], [], [], []
for i in range(status.shape[0]):
tn, fp, fn, tp = confusion_matrix(status[i, :], status_pred[i, :], labels=[0, 1]).ravel()
acc = (tn + tp) / (tn + fp + fn + tp)
precision = tp / np.max((tp + fp, 1e-9))
recall = tp / np.max((tp + fn, 1e-9))
f1_score = 2 * (precision * recall) / np.max((precision + recall, 1e-9))
accs.append(acc)
precisions.append(precision)
recalls.append(recall)
f1_scores.append(f1_score)
return np.array(accs), np.array(precisions), np.array(recalls), np.array(f1_scores)
def regression_errors(pred, label):
assert pred.shape == label.shape
if type(pred)!=np.ndarray:
pred = pred.detach().cpu().numpy().squeeze()
if type(label)!=np.ndarray:
label = label.detach().cpu().numpy().squeeze()
pred = pred.reshape(pred.shape[0],-1)
label = label.reshape(label.shape[0],-1)
epsilon = np.full(label.shape, 1e-9)
mae_arr, mre_arr = [], []
for i in range(label.shape[0]):
abs_diff = np.abs(label[i,:] - pred[i,:])
mae = np.mean(abs_diff)
mre_num = np.nan_to_num(abs_diff)
mre_den = np.max((label[i,:], pred[i,:], epsilon[i,:]), axis=0)
mre = np.mean(mre_num/mre_den)
mae_arr.append(mae)
mre_arr.append(mre)
return np.array(mae_arr), np.array(mre_arr)
def compute_status(data,threshold,min_on,min_off):
status = np.zeros(data.shape)
if len(data.squeeze().shape) == 1:
columns = 1
else:
columns = data.squeeze().shape[-1]
threshold = [threshold]
min_on = [min_on]
min_off = [min_off]
for i in range(columns):
initial_status = data[:, i] >= threshold[i]
status_diff = np.diff(initial_status)
events_idx = status_diff.nonzero()
events_idx = np.array(events_idx).squeeze()
events_idx += 1
if initial_status[0]:
events_idx = np.insert(events_idx, 0, 0)
if initial_status[-1]:
events_idx = np.insert(
events_idx, events_idx.size, initial_status.size)
events_idx = events_idx.reshape((-1, 2))
on_events = events_idx[:, 0].copy()
off_events = events_idx[:, 1].copy()
assert len(on_events) == len(off_events)
if len(on_events) > 0:
off_duration = on_events[1:] - off_events[:-1]
off_duration = np.insert(off_duration, 0, 1000)
on_events = on_events[off_duration > min_off[i]]
off_events = off_events[np.roll(off_duration, -1) > min_off[i]]
on_duration = off_events - on_events
on_events = on_events[on_duration >= min_on[i]]
off_events = off_events[on_duration >= min_on[i]]
assert len(on_events) == len(off_events)
temp_status = data[:, i].copy()
temp_status[:] = 0
for on, off in zip(on_events, off_events):
temp_status[on: off] = 1
status[:, i] = temp_status
return status