-
Notifications
You must be signed in to change notification settings - Fork 26
/
utils.py
258 lines (218 loc) · 8.69 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# -*- coding: utf-8 -*-
"""
Created on Sat Feb 8 02:27:20 2020
@author: ZhiningLiu1998
mailto: [email protected] / [email protected]
"""
import pandas as pd
import numpy as np
from sklearn.metrics import (
f1_score,
average_precision_score,
matthews_corrcoef,
)
from sklearn.model_selection import train_test_split
class Rater():
"""Rater for evaluate classifiers performance on class imabalanced data.
Parameters
----------
metric : {'aucprc', 'mcc', 'fscore'}, optional (default='aucprc')
Specify the performance metric used for evaluation.
If 'aucprc' then use Area Under Precision-Recall Curve.
If 'mcc' then use Matthews Correlation Coefficient.
If 'fscore' then use F1-score, also known as balanced F-score or F-measure.
Passing other values raises an exception.
threshold : float, optional (default=0.5)
The threshold used for binarizing the predicted probability.
It does not affect the AUCPRC score
Attributes
----------
metric_ : string
The performance metric used for evaluation.
threshold_ : float
The predict threshold.
"""
def __init__(self, metric='aucprc', threshold=0.5):
if metric not in ['aucprc', 'mcc', 'fscore', 'bacc']:
raise ValueError(f'Metric {metric} is not supported.\
\nSupport metrics: [aucprc, mcc, fscore].')
self.metric_ = metric
self.threshold_ = threshold
def score(self, y_true, y_pred):
"""Score function.
Parameters
----------
y_true : array-like of shape = [n_samples]
The ground truth labels.
y_pred : array-like of shape = [n_samples]
The predict probabilities.
Returns
----------
score: float
"""
if self.metric_ == 'aucprc':
return average_precision_score(y_true , y_pred)
elif self.metric_ == 'mcc':
y_pred_b = y_pred.copy()
y_pred_b[y_pred_b < self.threshold_] = 0
y_pred_b[y_pred_b >= self.threshold_] = 1
return matthews_corrcoef(y_true, y_pred_b)
elif self.metric_ == 'fscore':
y_pred_b = y_pred.copy()
y_pred_b[y_pred_b < self.threshold_] = 0
y_pred_b[y_pred_b >= self.threshold_] = 1
return f1_score(y_true, y_pred_b)
def load_dataset(dataset_name):
"""Util function that load training/validation/test data from /data folder.
Parameters
----------
dataset_name : string
Name of the target dataset.
Train/validation/test data are expected to save in .csv files with
suffix _{train/valid/test}.csv. Labels should be at the last column
named with 'label'.
Returns
----------
X_train, y_train, X_valid, y_valid, X_test, y_test
Pandas DataFrames / Series
"""
df_train = pd.read_csv(f'data/{dataset_name}_train.csv')
X_train = df_train[df_train.columns.tolist()[:-1]]
y_train = df_train['label']
df_valid = pd.read_csv(f'data/{dataset_name}_valid.csv')
X_valid = df_valid[df_valid.columns.tolist()[:-1]]
y_valid = df_valid['label']
df_test = pd.read_csv(f'data/{dataset_name}_test.csv')
X_test = df_test[df_test.columns.tolist()[:-1]]
y_test = df_test['label']
return X_train.values, y_train.values, \
X_valid.values, y_valid.values, \
X_test.values, y_test.values
def histogram_error_distribution(y_true, y_pred, bins):
"""Util function that compute the error histogram.
Parameters
----------
y_true : array-like of shape = [n_samples]
The ground truth labels.
y_pred : array-like of shape = [n_samples]
The predict probabilities.
bins : int, number of bins in the histogram
Returns
----------
hist : array-like of shape = [bins]
"""
error = np.absolute(y_true - y_pred)
hist, _ = np.histogram(error, bins=bins)
return hist
def gaussian_prob(x, mu, sigma):
"""The Gaussian function.
Parameters
----------
x : float
Input number.
mu : float
Parameter mu of the Gaussian function.
sigma : float
Parameter sigma of the Gaussian function.
Returns
----------
output : float
"""
return (1 / (sigma * np.sqrt(2*np.pi))) * np.exp(-0.5*np.power((x-mu)/sigma, 2))
def meta_sampling(y_pred, y_true, X, n_under_samples, mu, sigma, random_state=None):
"""The meta-sampling process in MESA.
Parameters
----------
y_pred : array-like of shape = [n_samples]
The predict probabilities.
y_true : array-like of shape = [n_samples]
The ground truth labels.
X : array-like of shape = [n_samples, n_features]
The original data to be meta-sampled.
n_under_samples : int, <= n_samples
The expected number of instances in the subset after meta-sampling.
mu : float
Parameter mu of the Gaussian function.
sigma : float
Parameter sigma of the Gaussian function.
random_state : int or None, optional (default=None)
If int, random_state is the seed used by the random number generator.
If None, the random number generator is the RandomState instance used
by np.random.
Returns
----------
X_subset : array-like of shape = [n_under_samples, n_features]
The subset after meta-sampling.
"""
sample_weights = gaussian_prob(np.absolute(y_true - y_pred), mu, sigma)
X_subset = pd.DataFrame(X).sample(n_under_samples, weights=sample_weights, random_state=random_state)
return X_subset
def imbalance_train_test_split(X, y, test_size, random_state=None):
'''Train/Test split that guarantee same class distribution between split datasets.'''
classes = np.unique(y)
X_trains, y_trains, X_tests, y_tests = [], [], [], []
for label in classes:
inds = (y==label)
X_label, y_label = X[inds], y[inds]
X_train, X_test, y_train, y_test = train_test_split(
X_label, y_label, test_size=test_size, random_state=random_state)
X_trains.append(X_train)
X_tests.append(X_test)
y_trains.append(y_train)
y_tests.append(y_test)
X_train = np.concatenate(X_trains)
X_test = np.concatenate(X_tests)
y_train = np.concatenate(y_trains)
y_test = np.concatenate(y_tests)
return X_train, X_test, y_train, y_test
def state_scale(state, scale):
'''Scale up the meta-states.'''
return state / state.sum() * 2 * scale
def memory_init_fulfill(args, memory):
'''Initialize the memory.'''
num_bins = args.num_bins
memory_size = args.replay_size
error_in_bins = np.linspace(0, 1, num_bins)
mu = 0.3
unfitted, midfitted, fitted = \
gaussian_prob(error_in_bins, 1, mu), \
gaussian_prob(error_in_bins, 0.5, mu), \
gaussian_prob(error_in_bins, 0, mu)
underfitting_state = state_scale(np.concatenate([unfitted, unfitted]), num_bins)
learning_state = state_scale(np.concatenate([midfitted, midfitted]), num_bins)
overfitting_state = state_scale(np.concatenate([fitted, midfitted]), num_bins)
noise_scale = 0.5
num_per_transitions = int(memory_size/3)
for i in range(num_per_transitions):
state = underfitting_state + np.random.rand(num_bins*2) * noise_scale
next_state = underfitting_state + np.random.rand(num_bins*2) * noise_scale
memory.push(state, [0.9], args.reward_coefficient * 0.05, next_state, 0)
for i in range(num_per_transitions):
state = learning_state + np.random.rand(num_bins*2) * noise_scale
next_state = learning_state + np.random.rand(num_bins*2) * noise_scale
memory.push(state, [0.5], args.reward_coefficient * 0.05, next_state, 0)
for i in range(num_per_transitions):
state = overfitting_state + np.random.rand(num_bins*2) * noise_scale
next_state = overfitting_state + np.random.rand(num_bins*2) * noise_scale
memory.push(state, [0.1], args.reward_coefficient * 0.05, next_state, 0)
return memory
def transform(y):
if y.ndim == 1:
y = y[:, np.newaxis]
if y.shape[1] == 1:
y = np.append(1-y, y, axis=1)
return y
def cross_entropy(y_pred, y_true, epsilon=1e-4):
'''Cross-entropy error function.'''
y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
y_pred = transform(y_pred)
y_true = transform(y_true)
return (-y_true*np.log(y_pred)).sum(axis=1)
def slide_mean(data, window_half):
'''Slide mean for better visualization.'''
result = []
for i in range(len(data)):
lower_bound = max(i-window_half, 0)
upper_bound = min(i+window_half+1, len(data)-1)
result.append(np.mean(data[lower_bound:upper_bound]))
return result