forked from h2oai/driverlessai-recipes
-
Notifications
You must be signed in to change notification settings - Fork 1
/
auto_arima_forecast.py
365 lines (306 loc) · 15 KB
/
auto_arima_forecast.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
"""Auto ARIMA transformer is a time series transformer that predicts target using ARIMA models."""
# For more information about the python ARIMA package
# please visit https://www.alkaline-ml.com/pmdarima/index.html
import importlib
import numpy as np
import pandas as pd
import datatable as dt
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from h2oaicore.systemutils import make_experiment_logger, loggerinfo, loggerwarning
from h2oaicore.transformer_utils import CustomTimeSeriesTransformer
from h2oaicore.separators import orig_feat_prefix, extra_prefix
class MyAutoArimaTransformer(CustomTimeSeriesTransformer):
_binary = False
_multiclass = False
_modules_needed_by_name = ['pmdarima==1.8.3']
_included_model_classes = None
_testing_can_skip_failure = False # ensure tested as if shouldn't fail
_lag_recipe_allowed = True
_causal_recipe_allowed = False
_unsupervised = False # uses target
_uses_target = True # uses target
@staticmethod
def get_default_properties():
return dict(col_type="time_column", min_cols=1, max_cols=1, relative_importance=1)
@staticmethod
def can_use(accuracy, interpretability, **kwargs):
return False # by default auto arima is too slow, but if the only model selected this will still allow use
def fit(self, X: dt.Frame, y: np.array = None):
"""
Fits ARIMA models (1 per time group) using historical target values contained in y
:param X: Datatable frame containing the features
:param y: numpy array containing the historical values of the target
:return: self
"""
# Import the ARIMA python module
pm = importlib.import_module('pmdarima')
self.scalers = None
logger = self._get_experiment_logger()
# 0. Preliminary steps
tgc_wo_time = list(np.setdiff1d(self.tgc, self.time_column))
X = X[:, self.tgc].to_pandas()
# Fill NaNs or None
X = X.replace([None, np.nan], 0)
# Add target, Label encoder is only used for Classif. which we don't support...
if self.labels is not None:
y = LabelEncoder().fit(self.labels).transform(y)
X['y'] = np.array(y)
# 0. Fit general scaler to make predictions for unknown groups
X.rename(columns={self.time_column: "ds"}, inplace=True)
self.general_scaler = MinMaxScaler(feature_range=(1, 2)).fit(X[['y', 'ds']].groupby('ds').median().values)
# 1. Scale target for each individual group
# Go through groups and standard scale them
X['y_skl'] = self.scale_target_per_time_group(X, tgc_wo_time, logger)
# 2. Make time a pandas datetime series so that we can order it
X['ds'] = pd.to_datetime(X['ds'], format=self.datetime_formats[self.time_column])
# 3. Fit a model on averages
X_avg = X[['ds', 'y_skl']].groupby('ds').mean().reset_index()
order = np.argsort(X_avg['ds'])
try:
self.avg_model = pm.auto_arima(X_avg['y_skl'].values[order], error_action='ignore', seasonal=False)
except Exception as e:
loggerinfo(logger, "ARIMA: Average model error : {}".format(e))
self.avg_model = None
# 4. Fit model for Average Groups
self.models = {}
# Go through groups
for grp_col in tgc_wo_time:
print(f'fitting {grp_col}')
# Get the unique dates to be predicted
X_groups = X[['ds', 'y_skl', grp_col]].groupby(grp_col)
print(X.shape)
nb_groups = len(X_groups)
for _i_g, (key, X_grp) in enumerate(X_groups):
# Just say where we are in the fitting process
if (_i_g + 1) % max(1, nb_groups // 20) == 0:
loggerinfo(logger, "Auto ARIMA : %d%% of groups fitted" % (100 * (_i_g + 1) // nb_groups))
# Average over dates
X_grp = X_grp.groupby('ds')['y_skl'].mean().reset_index()
grp_hash = self.get_hash(grp_col, key)
# print("auto arima - fitting on data of shape: %s for group: %s" % (str(X.shape), grp_hash))
X_grp['ds'] = pd.to_datetime(X_grp['ds'], format=self.datetime_formats[self.time_column])
order = np.argsort(X_grp['ds'])
try:
model = pm.auto_arima(X_grp['y_skl'].values[order], error_action='ignore', seasonal=False)
except Exception as e:
loggerinfo(logger, "Auto ARIMA warning: {}".format(e))
model = None
self.models[grp_hash] = model
return self
def get_hash(self, col='', key=None):
# Create dict key to store the min max scaler
if isinstance(key, tuple):
key = [col] + list(key)
elif isinstance(key, list):
pass
else:
# Not tuple, not list
key = [col, key]
grp_hash = '_'.join(map(str, key))
return grp_hash
def scale_target_per_time_group(self, X, tgc_wo_time, logger):
loggerinfo(logger, 'Start of group scaling')
if len(tgc_wo_time) > 0:
X_groups = X.groupby(tgc_wo_time)
else:
X_groups = [([None], X)]
if self.scalers is None:
self.scalers = {}
scaled_ys = []
for key, X_grp in X_groups:
# Create dict key to store the min max scaler
grp_hash = self.get_hash(key)
# Scale target for current group
self.scalers[grp_hash] = MinMaxScaler(feature_range=(1, 2))
y_skl = self.scalers[grp_hash].fit_transform(X_grp[['y']].values)
# Put back in a DataFrame to keep track of original index
y_skl_df = pd.DataFrame(y_skl, columns=['y'])
# (0, 'A') (1, 4) (100, 1) (100, 1)
# print(grp_hash, X_grp.shape, y_skl.shape, y_skl_df.shape)
y_skl_df.index = X_grp.index
scaled_ys.append(y_skl_df)
else:
scaled_ys = []
for key, X_grp in X_groups:
# Create dict key to store the min max scaler
grp_hash = self.get_hash(key)
# Scale target for current group
y_skl = self.scalers[grp_hash].transform(X_grp[['y']].values)
# Put back in a DataFrame to keep track of original index
y_skl_df = pd.DataFrame(y_skl, columns=['y'])
# (0, 'A') (1, 4) (100, 1) (100, 1)
# print(grp_hash, X_grp.shape, y_skl.shape, y_skl_df.shape)
y_skl_df.index = X_grp.index
scaled_ys.append(y_skl_df)
loggerinfo(logger, 'End of group scaling')
return pd.concat(tuple(scaled_ys), axis=0)
def transform(self, X: dt.Frame, **kwargs):
"""
Uses fitted models (1 per time group) to predict the target
If self.is_train exists, it means we are doing in-sample predictions
if it does not then we Arima is used to predict the future
:param X: Datatable Frame containing the features
:return: ARIMA predictions
"""
logger = self._get_experiment_logger()
# 0. Preliminary steps
tgc_wo_time = list(np.setdiff1d(self.tgc, self.time_column))
X = X[:, self.tgc].to_pandas()
# Fill NaNs or None
X = X.replace([None, np.nan], 0)
X.rename(columns={self.time_column: "ds"}, inplace=True)
X['ds'] = pd.to_datetime(X['ds'], format=self.datetime_formats[self.time_column])
# 1. Predict with average model
if self.avg_model is not None:
X_time = X[['ds']].groupby('ds').first().reset_index()
if hasattr(self, 'is_train'):
yhat = self.avg_model.predict_in_sample()
else:
yhat = self.avg_model.predict(n_periods=self.pred_gap + X_time.shape[0])
# Assign predictions the same order the dates had
yhat = yhat[self.pred_gap:]
X_time.sort_values('ds', inplace=True)
X_time['yhat'] = yhat
X_time.sort_index(inplace=True)
# Merge back the average prediction to all similar timestamps
indices = X.index
X = pd.merge(
left=X,
right=X_time[['ds', 'yhat']],
on='ds',
how='left'
)
X.index = indices
else:
X['yhat'] = np.nan
y_avg_model = X['yhat'].values
y_predictions = pd.DataFrame(y_avg_model, columns=['average_pred'])
# 2. Predict for individual group
# Go through groups
for i_tgc, grp_col in enumerate(tgc_wo_time):
y_hat_tgc = np.zeros(X.shape[0])
# Get the unique dates to be predicted
X_groups = X[['ds', grp_col]].groupby(grp_col)
nb_groups = len(X_groups)
dfs = []
for _i_g, (key, X_grp) in enumerate(X_groups):
# Just say where we are in the fitting process
if (_i_g + 1) % max(1, nb_groups // 20) == 0:
loggerinfo(logger, "Auto ARIMA : %d%% of groups transformed" % (100 * (_i_g + 1) // nb_groups))
grp_hash = self.get_hash(grp_col, key)
try:
model = self.models[grp_hash]
except KeyError:
model = None
# Find unique datetime
X_time = X_grp[['ds']].groupby('ds').first().reset_index()
X_time['ds'] = pd.to_datetime(X_time['ds'], format=self.datetime_formats[self.time_column])
X_time = X_time.sort_values('ds')
if model is not None:
# Get predictions from ARIMA model, make sure we include prediction gaps
if hasattr(self, 'is_train'):
print(X_grp.shape, model.predict_in_sample().shape)
# It can happen that in_sample predictions are smaller than the training set used
pred = model.predict_in_sample()
tmp = np.zeros(X_time.shape[0])
tmp[:len(pred)] = pred
X_time['yhat'] = tmp
else:
# In ARIMA, you provide the number of periods you predict on
# So you have to
yhat = model.predict(n_periods=self.pred_gap + X_time.shape[0])
X_time['yhat'] = yhat[self.pred_gap:]
# Now merge back the predictions into X_grp
indices = X_grp.index
X_grp = pd.merge(
left=X_grp,
right=X_time[['ds', 'yhat']],
on='ds',
how='left'
)
X_grp.index = indices
else:
X_grp = X_grp.copy()
X_grp['yhat'] = np.nan
dfs.append(X_grp['yhat'])
y_predictions[f'{grp_col}_pred'] = pd.concat(dfs, axis=0)
# Now we have to invert scale all this
for grp_col in tgc_wo_time:
# Add time group to the predictions, will be used to invert scaling
y_predictions[grp_col] = X[grp_col].copy()
# Fill NaN
y_predictions[f'{grp_col}_pred'] = y_predictions[f'{grp_col}_pred'].fillna(y_predictions['average_pred'])
# Go through groups and recover the scaled target for knowed groups
if len(tgc_wo_time) > 0:
X_groups = y_predictions.groupby(tgc_wo_time)
else:
X_groups = [([None], y_predictions)]
for _f in [f'{grp_col}_pred' for grp_col in tgc_wo_time] + ['average_pred']:
inverted_ys = []
for key, X_grp in X_groups:
grp_hash = self.get_hash(key)
# Scale target for current group
if grp_hash in self.scalers.keys():
inverted_y = self.scalers[grp_hash].inverse_transform(X_grp[[_f]])
else:
inverted_y = self.general_scaler.inverse_transform(X_grp[[_f]])
# Put back in a DataFrame to keep track of original index
inverted_df = pd.DataFrame(inverted_y, columns=[_f])
inverted_df.index = X_grp.index
inverted_ys.append(inverted_df)
y_predictions[_f] = pd.concat(tuple(inverted_ys), axis=0).sort_index()[_f]
y_predictions.drop(tgc_wo_time, axis=1, inplace=True)
if kwargs.get('is_fit', False):
self._output_feature_names = [f'{self.display_name}{orig_feat_prefix}{self.time_column}{extra_prefix}{_f}'
for _f in y_predictions]
self._feature_desc = self._output_feature_names
return y_predictions
def fit_transform(self, X: dt.Frame, y: np.array = None, **kwargs):
"""
Fits the ARIMA models (1 per time group) and outputs the corresponding predictions
:param X: Datatable Frame
:param y: Target to be used to fit the ARIMA model and perdict in-sample
:return: in-sample ARIMA predictions
"""
# Flag the fact we are doing in-sample predictions
self.is_train = True
ret = self.fit(X, y).transform(X, is_fit=True)
del self.is_train
return ret
def update_history(self, X: dt.Frame, y: np.array = None):
"""
Update the model fit with additional observed endog/exog values.
Updating an ARIMA adds new observations to the model, updating the MLE of the parameters
accordingly by performing several new iterations (maxiter) from the existing model parameters.
:param X: Datatable Frame containing input features
:param y: Numpy array containing new observations to update the ARIMA model
:return:
"""
X = X.to_pandas()
XX = X[self.tgc].copy()
XX['y'] = np.array(y)
tgc_wo_time = list(np.setdiff1d(self.tgc, self.time_column))
if len(tgc_wo_time) > 0:
XX_grp = XX.groupby(tgc_wo_time)
else:
XX_grp = [([None], XX)]
for key, X in XX_grp:
key = key if isinstance(key, list) else [key]
grp_hash = '_'.join(map(str, key))
# print("auto arima - update history with data of shape: %s for group: %s" % (str(X.shape), grp_hash))
order = np.argsort(X[self.time_column])
if grp_hash in self.models:
model = self.models[grp_hash]
if model is not None:
model.update(X['y'].values[order])
return self
def _get_experiment_logger(self):
# Get the logger if it exists
logger = None
if self.context and self.context.experiment_id:
logger = make_experiment_logger(
experiment_id=self.context.experiment_id,
tmp_dir=self.context.tmp_dir,
experiment_tmp_dir=self.context.experiment_tmp_dir
)
return logger