Skip to content

Commit

Permalink
Merge pull request #102 from zillow/batch-model-bug-fixes
Browse files Browse the repository at this point in the history
Luminaire Package Upgrade
  • Loading branch information
sayanchk authored Feb 11, 2022
2 parents 8c48e15 + cce13b7 commit 48505ca
Show file tree
Hide file tree
Showing 14 changed files with 156 additions and 37 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/github-pages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ jobs:
steps:
- uses: actions/checkout@v2

- name: Setup Python
- name: Set up Python 3.10
uses: actions/setup-python@v2
with:
python-version: '3.7'
python-version: '3.10'

- name: Install dependencies
run: |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ jobs:
steps:
- uses: actions/checkout@v2

- name: Set up Python 3.7
- name: Set up Python 3.10
uses: actions/setup-python@v2
with:
python-version: '3.7'
python-version: '3.10'

- name: Cache Dependencies
id: cache
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ jobs:
steps:
- uses: actions/checkout@v2

- name: Set up Python 3.7
- name: Set up Python 3.10
uses: actions/setup-python@v2
with:
python-version: '3.7'
python-version: '3.10'

- name: Install dependencies
run: |
Expand Down
5 changes: 5 additions & 0 deletions docs/tutorial/dataprofiling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ Luminaire *DataExploration* implements different exploratory data analysis to de

Luminaire data exploration and profiling runs two different workflows. The impute only option in profiling performs imputation for any missing data in the input time series and does not run any profiling to generate insights from the input time series.

.. NOTE::
If duplicate dates dates are present in the input time series, Data profiling processes the data dates by averaging
the duplicates and merging them as a single data date.


>>> from luminaire.exploration.data_exploration import DataExploration
>>> data
raw
Expand Down
7 changes: 7 additions & 0 deletions docs/tutorial/outlier_batch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ These *preprocessing_parameters* are used for training the structural model.
>>> print(success, model_date, model)
(True, '2020-06-07 00:00:00', <luminaire_models.model.lad_structural.LADStructuralModel object at 0x7f97e127d320>)

.. NOTE::
Luminaire allows to run model validation during the training process. This validation is a check for any underfit
that might have occurred due to any misspecification of the model hyperparameters or due to nonstationarity.
Validation flag is set to false by default but that can be turned on by simply setting *validation = True* during
training. This option is only available for the *LADStructuralModel* class. Refer to the API reference for more
information.

The trained model works as a data-driven source of truth to evaluate any future time series values to be monitored. The *score* method is used to check whether new data points are anomalous.

>>> model.score(2000, '2020-06-08')
Expand Down
32 changes: 28 additions & 4 deletions luminaire/exploration/data_exploration.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,24 @@ def __init__(self,
self.tc_max_window_length = 24

def add_missing_index(self, df=None, freq=None):
"""
This function reindexes a pandas dataframe with missing dates for a given time series frequency.
Note: If duplicate dates dates are present in the dataframe, this function takes average of the duplicate
data dates and merges them as a single data date.
:param pandas.DataFrame df: Input pandas dataframe containing the time series
:param str freq: The frequency of the time-series. A `Pandas offset`_ such as 'D', 'H', or 'M'
:return: pandas dataframe after reindexing missing data dates
:rtype: pandas.DataFrame
"""

import pandas as pd

# Adding a group by logic for duplicate index
df = df.groupby(df.index).mean()

# Create a new Pandas data frame based on the first valid index and
# current date using the frequency defined by the use
idx = pd.date_range(start=df.first_valid_index(), end=df.last_valid_index(), freq=freq)
Expand Down Expand Up @@ -798,14 +813,20 @@ def kf_naive_outlier_detection(self, input_series, idx_position):
from pykalman import KalmanFilter

kf = KalmanFilter()
filtered_state_means, filtered_state_covariance = kf.em(input_series).filter(input_series)

residuals = input_series - filtered_state_means[:,0]
truncated_series = input_series[-(self.min_ts_length * 3):] \
if idx_position == len(input_series) - 1 else input_series
idx_position = len(truncated_series) - 1

filtered_state_means, filtered_state_covariance = kf.em(truncated_series).filter(truncated_series)

residuals = truncated_series - filtered_state_means[:, 0]

# Catching marginal anomalies to avoid during training
is_anomaly = residuals[idx_position] < np.mean(residuals) \
- (2.5 * np.sqrt(filtered_state_covariance)[idx_position][0][0]) \
- (1 * np.sqrt(filtered_state_covariance)[idx_position][0][0]) \
or residuals[idx_position] > np.mean(residuals) \
+ (2.5 * np.sqrt(filtered_state_covariance)[idx_position][0][0])
+ (1 * np.sqrt(filtered_state_covariance)[idx_position][0][0])

return is_anomaly

Expand Down Expand Up @@ -882,6 +903,9 @@ def _prepare(self, df, impute_only, streaming=False, **kwargs):
df = df.iloc[-min(max_ts_length, len(df)):]
df = self._truncate_by_data_gaps(df=df, target_metric=target_metric)

if len(df) < min_ts_length:
raise ValueError("Due to a recent data gap, training is waiting for more data to populate")

if not streaming and len(df) < min_ts_length:
raise ValueError("The training data observed continuous missing data near the end. Require more stable "
"data to train")
Expand Down
90 changes: 80 additions & 10 deletions luminaire/model/lad_structural.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def _seasonal_arima(self, endog=None, exog=None, p=None, d=None, q=None, imodels
"""

import numpy as np
import statsmodels.tsa.arima_model as arima
import statsmodels.tsa.arima.model as arima

# Extract the exogenous variable generated based on (imodels * 2) number of most significant
# frequencies
Expand All @@ -257,13 +257,7 @@ def _seasonal_arima(self, endog=None, exog=None, p=None, d=None, q=None, imodels

try:
stepwise_fit.append(arima.ARIMA(endog=endog, exog=exog,
order=(p, d, q)).fit(seasonal=False, trace=False,
method='css',
solver='bfgs',
error_action='ignore',
stepwise_fit=True,
warn_convergence=False,
disp=False))
order=(p, d, q)).fit(method='statespace'))
except Exception as e:
raise LADStructuralError(message=str(e))

Expand Down Expand Up @@ -442,12 +436,77 @@ def _training(self, data, ts_start, ts_end, min_ts_length=None, min_ts_mean=None

return result, order

def train(self, data, optimize=False, **kwargs):
def _validate_model(self, data, hyper_params, result):
"""
This function validates a newly trained model before publishing. This valudation checks for underfit due to
misspecification of hyperparameters or nonstationarity.
:param pandas.DataFrame data: Input time series data
:param dict hyper_params: Hyperparameters for model training
:param dict result: Training outputs
:return: A validation flag for model to publish. The function returns True when the validation is successful,
False otherwise.
:rtype: bool
"""

import numpy as np
import scipy.stats as st

levene_alpha = 0.05
grubb_alpha = 0.001
f_test_alpha = 0.05
mwu_alpha = 0.0005
passed = True

model = LADStructuralModel(hyper_params=self.hyper_params, **result)

freq = "1" + result['freq'] if not any(char.isdigit() for char in result['freq']) else result['freq']

baseline = data[self._imputed_metric][-self.max_scoring_length:].values
baseline = (np.exp(baseline) - 1).tolist() if hyper_params['is_log_transformed'] else baseline.tolist()

predictions = []
pred_date = pd.Timestamp(result['training_end_date'])
for i in range(len(baseline)):
pred_date = pred_date + pd.Timedelta(freq)
result = model.score(baseline[i], pred_date)
if result['Success']:
predictions.append(result['Prediction'])

# We perform Levene test, Mann-Whitney U test, and Grubb test between the predictions and the baseline
# to understand if the are widely different. The two arrays need to be of the same length in order to
# perform these tests.
if len(predictions) == len(baseline):
N = len(predictions)

pvalue_levene = st.levene(predictions, baseline)[1]
pvalue_levene_diff = st.levene(np.diff(predictions), np.diff(baseline))[1]
pvalue_mwu = st.mannwhitneyu(predictions, baseline)[1] if predictions != baseline else 1.0

grubb_test_stat = max(abs(np.array(predictions) - np.mean(predictions))) / np.std(predictions, ddof=1)
t_cutoff = st.t.ppf((grubb_alpha / (2 * N)), df=N - 2)
grubb_test = grubb_test_stat > ((N - 1) / np.sqrt(N)) * np.sqrt((t_cutoff**2) / (N - 2 + (t_cutoff**2)))

if grubb_test:
passed = False
if np.std(baseline, ddof=1) > 0:
if (not np.isnan(pvalue_levene) and not np.isnan(pvalue_levene_diff) and not np.isnan(pvalue_mwu)) and \
(pvalue_levene < levene_alpha or pvalue_levene_diff < levene_alpha):
F = (np.std(predictions, ddof=1) ** 2) / (np.std(baseline, ddof=1) ** 2)
f_test_pval = 1 - st.f.cdf(F, N - 1, len(baseline) - 1)
if f_test_pval < f_test_alpha and pvalue_mwu < mwu_alpha:
passed = False

return passed

def train(self, data, optimize=False, validate=False, **kwargs):
"""
This function trains a structural LAD model for a given time series.
:param pandas.DataFrame data: Input time series data
:param bool optimize: Flag to identify whether called from hyperparameter optimization
:param bool validate: Flag to identify whether to run model validation after training
:return: success flag, the model date and the trained lad structural model object
:rtype: tuple[bool, str, LADStructuralModel object]
Expand Down Expand Up @@ -493,6 +552,13 @@ def train(self, data, optimize=False, **kwargs):

success = False if 'ErrorMessage' in result else True

# Model validation step to check underfit due to misspecification of hyperparameters to some other reasons.
if success and validate and not optimize:
passed = self._validate_model(data=data, hyper_params=self.hyper_params, result=result)
if not passed:
success = False
result['ErrorMessage'] = 'Model validation failed! Check any issues with the input data or the hyper-parameters.'

return success, kwargs['ts_end'], LADStructuralModel(hyper_params=self.hyper_params, **result)

@classmethod
Expand Down Expand Up @@ -569,7 +635,11 @@ def _predict(cls, model, is_log_transformed,
else:
pred_exog['fourier_feature'] = seasonal_feature_scoring[:forecast_ndays]

forecast = list(model.forecast(steps=forecast_ndays, alpha=alpha, exog=pred_exog))
forecast = []
forecast_obj = model.get_forecast(steps=forecast_ndays, alpha=alpha, exog=pred_exog)
forecast.append(np.real(forecast_obj.predicted_mean).tolist()) # adding prediction mean to forecast
forecast.append(np.real(forecast_obj.se_mean).tolist()) # adding prediction standard error to forecast
forecast.append(np.real(forecast_obj.conf_int()).tolist()) # adding predictions ci's to forecast
interpolated_training_data = list(zip(*training_tail))[1]

for order in list(reversed(range(order_of_diff))):
Expand Down
9 changes: 9 additions & 0 deletions luminaire/model/model_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class LADHolidays(pandas.tseries.holiday.AbstractHolidayCalendar):
offset=_DateOffset(weekday=_H.SU(1)))
Easter = _H.Holiday('Easter', month=1, day=1, offset=[_H.Easter(), _H.Day(0)])

USThanksgivingWednesday = _H.Holiday('Thanksgiving Wednesday', month=11, day=1,
offset=[_DateOffset(weekday=_H.TH(4)), _DateOffset(days=-1)])
USThanksgivingDay = _H.Holiday('Thanksgiving', month=11, day=1,
offset=_DateOffset(weekday=_H.TH(4)))
USThanksgivingFriday = _H.Holiday('Thanksgiving Friday', month=11, day=1,
Expand All @@ -67,6 +69,7 @@ class LADHolidays(pandas.tseries.holiday.AbstractHolidayCalendar):
USThanksgivingSunday = _H.Holiday('Thanksgiving Sunday', month=11, day=1,
offset=[_DateOffset(weekday=_H.TH(4)), _DateOffset(days=3)])

Dec23 = _H.Holiday('December 23', month=12, day=23)
ChristmasEve = _H.Holiday('Christmas Eve', month=12, day=24)
ChristmasDay = _H.Holiday('Christmas Day', month=12, day=25)
ChristmasDayObserved = _H.Holiday('Christmas Day Observed', month=12, day=25, observance=_H.nearest_workday)
Expand All @@ -79,6 +82,8 @@ class LADHolidays(pandas.tseries.holiday.AbstractHolidayCalendar):

NewYearsDay = _H.Holiday('New Years Day', month=1, day=1)
NewYearsDayObserved = _H.Holiday('New Years Day Observed', month=1, day=1, observance=_H.nearest_workday)
Jan2 = _H.Holiday('January 2', month=1, day=2)
Jan3 = _H.Holiday('January 3', month=1, day=3)

rules = [
USMemorialDay,
Expand All @@ -105,10 +110,12 @@ class LADHolidays(pandas.tseries.holiday.AbstractHolidayCalendar):
Superbowl,
Easter,

USThanksgivingWednesday,
USThanksgivingDay,
USThanksgivingFriday,
USThanksgivingSaturday,
USThanksgivingSunday,
Dec23,
ChristmasEve,
ChristmasDay,
ChristmasDayObserved,
Expand All @@ -120,6 +127,8 @@ class LADHolidays(pandas.tseries.holiday.AbstractHolidayCalendar):
Dec31,
NewYearsDay,
NewYearsDayObserved,
Jan2,
Jan3,
]

def __init__(self, name=None, holiday_rules=None):
Expand Down
24 changes: 13 additions & 11 deletions luminaire/model/window_density.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,16 @@ def _volume_shift_detection(self, mean_list=None, sd_list=None, probability_thre
:rtype: int
"""
import numpy as np
from bayesian_changepoint_detection import offline_changepoint_detection as offcd
from bayesian_changepoint_detection.priors import const_prior
from bayesian_changepoint_detection.bayesian_models import offline_changepoint_detection
import bayesian_changepoint_detection.offline_likelihoods as offline_ll
from functools import partial

# Volume shift detection over the means of the training window
q, p, pcp = offcd.offline_changepoint_detection(
q, p, pcp = offline_changepoint_detection(
data=np.array(mean_list),
prior_func=partial(offcd.const_prior, l=(len(mean_list) + 1)),
observation_log_likelihood_function=offcd.gaussian_obs_log_likelihood,
prior_function=partial(const_prior, p=1/(len(mean_list) + 1)),
log_likelihood_class=offline_ll.StudentT(),
truncate=-10)

mask_mean = np.append(0, np.exp(pcp).sum(0)) > probability_threshold
Expand All @@ -118,10 +120,10 @@ def _volume_shift_detection(self, mean_list=None, sd_list=None, probability_thre
change_points = np.array(mask_mean).nonzero()
last_mean_cp = change_points[0][-1] if len(change_points[0]) > 0 else []

q, p, pcp = offcd.offline_changepoint_detection(
q, p, pcp = offline_changepoint_detection(
data=np.array(sd_list),
prior_func=partial(offcd.const_prior, l=(len(sd_list) + 1)),
observation_log_likelihood_function=offcd.gaussian_obs_log_likelihood,
prior_function=partial(const_prior, p=1/(len(sd_list) + 1)),
log_likelihood_class=offline_ll.StudentT(),
truncate=-10)

mask_sd = np.append(0, np.exp(pcp).sum(0)) > probability_threshold
Expand Down Expand Up @@ -155,8 +157,8 @@ def _distance_function(self, data=None, called_for=None, baseline=None):
if called_for == "training":
distance = []
for i in range(0, len(data) - 1):
q = stats.kde.gaussian_kde(data[i])
p = stats.kde.gaussian_kde(data[i + 1])
q = stats.gaussian_kde(data[i])
p = stats.gaussian_kde(data[i + 1])

ts_min = min(np.min(data[i]), np.min(data[i + 1]))
ts_max = max(np.max(data[i]), np.max(data[i + 1]))
Expand All @@ -176,8 +178,8 @@ def _distance_function(self, data=None, called_for=None, baseline=None):

# If called for scoring, Kl divergence is performed between the scoring window and the baseline
elif called_for == "scoring":
q = stats.kde.gaussian_kde(baseline)
p = stats.kde.gaussian_kde(data)
q = stats.gaussian_kde(baseline)
p = stats.gaussian_kde(data)

ts_min = min(np.min(baseline), np.min(data))
ts_max = max(np.max(baseline), np.max(data))
Expand Down
Binary file modified luminaire/tests/models/lad_structural_model
Binary file not shown.
Binary file modified luminaire/tests/models/lad_structural_model_log_seasonal
Binary file not shown.
Binary file modified luminaire/tests/models/window_density_model
Binary file not shown.
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
bayesian-changepoint-detection>=0.2.dev1
bayesian-changepoint-detection @ git+https://github.com/hildensia/bayesian_changepoint_detection@2dd95f5c1d028116899a842ccb3baa173f9d5be9
changepy>=0.3.1
hyperopt>=0.1.2
numpy>=1.17.5
pandas>=0.25.3
pykalman>=0.9.5
scipy<=1.2.3
statsmodels>=0.10.2,<=0.12.2
scipy>=1.6.0
statsmodels>=0.13.0
scikit-learn>=0.24.2
decorator>=5.1.0
Loading

0 comments on commit 48505ca

Please sign in to comment.