Skip to content

Commit

Permalink
Merge pull request #93 from zillow/bug/invalid-input-handling
Browse files Browse the repository at this point in the history
scoring failure while reindexing missing data fixed
  • Loading branch information
sayanchk authored Aug 12, 2021
2 parents 9803a21 + 1c9b985 commit 6d66c2a
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 24 deletions.
1 change: 1 addition & 0 deletions luminaire/exploration/data_exploration.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ def _prepare(self, df, impute_only, streaming=False, **kwargs):
freq = self.freq

freq_delta = pd.Timedelta("1" + freq) if not any(char.isdigit() for char in str(freq)) else pd.Timedelta(freq)
df.index = pd.DatetimeIndex(df.index)
df = self.add_missing_index(df=df, freq=freq_delta)

if not streaming:
Expand Down
83 changes: 60 additions & 23 deletions luminaire/model/window_density.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,9 @@ def _get_result(self, input_df=None, detrend_order=None, agg_data_model=None, va
"""
The function scores the scoring window for anomalies based on the training metrics and the baseline
:param pandas.DataFrame input_df: Input data containing the training and the scoring data.
:param int detrend_order: The order of detrending based on MA or differencing method.
:param int detrend_order: The non-negative order of detrending based on Modeling or differencing method. When
the detrend_order > 0, corresponding detrending need to be performed using the method specified in the model
config.
:param luminaire.model.lad_structural.LADStructuralModel agg_data_model: Prediction model for aggregated data.
:param str value_column: Column containing the values.
:param str detrend_method: Selects between "modeling" or "diff" detrend method.
Expand All @@ -532,6 +534,10 @@ def _get_result(self, input_df=None, detrend_order=None, agg_data_model=None, va

is_anomaly = False
execution_data = input_df[value_column]
adjusted_execution_data = []
prob_of_anomaly = []
len_req_agg_data_model = 42 # Setting a hard threshold to have predictions from aggregated data
# for stationarity adjustment

if detrend_method == 'diff':
# Obtain the execution data and perform the necessary differencing
Expand All @@ -545,23 +551,49 @@ def _get_result(self, input_df=None, detrend_order=None, agg_data_model=None, va
execution_data_avg = np.mean(execution_data)
# If detrending is needed, we scale the scoring data accordingly using the agg_dat_model forecast
if detrend_order > 0:
snapshot_len_max = min(len(agg_data), len_req_agg_data_model)
agg_data_trunc = np.array(agg_data)[:, 1][-snapshot_len_max:]
data_adjust_forecast = []
try:
data_adjust_forecast = agg_data_model.score(execution_data_avg, scoring_datetime)['Prediction'] \
if agg_data_model else agg_data[-1][-1]
# Setting the data adjustment window of the original data using the predictions and the CILower and
# CIUpper keeping the prediction uncertainty of the agg_model in mind
if agg_data_model and len(agg_data) > len_req_agg_data_model:
score = agg_data_model.score(execution_data_avg, scoring_datetime)
data_adjust_forecast.append(score['Prediction'])
data_adjust_forecast.append(score['CILower'])
data_adjust_forecast.append(score['CIUpper'])
else:
data_adjust_forecast.append(np.median(agg_data_trunc))
data_adjust_forecast.append(np.percentile(agg_data_trunc, 5)) # setting a 2-sigma limit
data_adjust_forecast.append(np.percentile(agg_data_trunc, 95)) # setting a 2-sigma limit
except:
# If the scoring for the agg_data_model fails for some reason, we use the latest agg_data for the
# detrending adjustment
data_adjust_forecast = agg_data[-1][-1]
adjusted_execution_data = (execution_data / data_adjust_forecast).tolist()
data_adjust_forecast.append(np.median(agg_data_trunc))
data_adjust_forecast.append(np.percentile(agg_data_trunc, 5)) # setting a 2-sigma limit
data_adjust_forecast.append(np.percentile(agg_data_trunc, 95)) # setting a 2-sigma limit
for i in range(3):
if data_adjust_forecast[i] != 0:
adjusted_execution_data.append((execution_data / data_adjust_forecast[i]).tolist())
else:
adjusted_execution_data = list(execution_data)

# Kl divergence based anomaly detection
if detection_method == "kldiv":
current_anomaly_score = self._distance_function(data=adjusted_execution_data,
called_for="scoring", baseline=baseline)
prob_of_anomaly = st.gamma.cdf(current_anomaly_score, anomaly_scores_gamma_alpha,
anomaly_scores_gamma_loc, anomaly_scores_gamma_beta)
if detrend_order > 0:
prob_of_anomaly = []
for i in range(3):
current_anomaly_score = self._distance_function(data=adjusted_execution_data[i],
called_for="scoring", baseline=baseline)
prob_of_anomaly.append(st.gamma.cdf(current_anomaly_score, anomaly_scores_gamma_alpha,
anomaly_scores_gamma_loc, anomaly_scores_gamma_beta))
prob_of_anomaly = np.min(prob_of_anomaly)
else:
current_anomaly_score = self._distance_function(data=adjusted_execution_data,
called_for="scoring", baseline=baseline)
prob_of_anomaly = st.gamma.cdf(current_anomaly_score, anomaly_scores_gamma_alpha,
anomaly_scores_gamma_loc, anomaly_scores_gamma_beta)

if 1 - prob_of_anomaly < self.sig_level:
is_anomaly = True
# Sign test based anomaly detection
Expand All @@ -578,20 +610,25 @@ def _get_result(self, input_df=None, detrend_order=None, agg_data_model=None, va
# based test for the past standard deviations to detect anomalies
elif baseline_type == "aggregated":
baseline_sds = np.array(baseline).std(1).tolist()
baseline_execution_data = copy.copy(baseline)
baseline_execution_data.append(adjusted_execution_data)
pca = PCA()
scores = pca.fit_transform(StandardScaler().fit_transform(baseline_execution_data))
robust_cov = MinCovDet().fit(scores[:, :3])
mahalanobis_distance = robust_cov.mahalanobis(scores[:, :3])
pvalue_mahalanobis = 1 - st.chi2.cdf(mahalanobis_distance[-1],
np.array(baseline_execution_data).shape[1])

gamma_alpha, gamma_loc, gamma_beta = st.gamma.fit(baseline_sds)
pvalue_gamma = 1 - st.gamma.cdf(np.std(adjusted_execution_data), gamma_alpha, gamma_loc, gamma_beta)
if pvalue_mahalanobis < self.sig_level or pvalue_gamma < self.sig_level:
is_anomaly = True
prob_of_anomaly = 1 - min(pvalue_mahalanobis, pvalue_gamma)
if detrend_order == 0:
# crearing a 2d list to make it easy to loop through in the following for loop
adjusted_execution_data = [adjusted_execution_data]
for current_adjusted_data in adjusted_execution_data:
baseline_execution_data = copy.copy(baseline)
baseline_execution_data.append(current_adjusted_data)
pca = PCA()
scores = pca.fit_transform(StandardScaler().fit_transform(baseline_execution_data))
robust_cov = MinCovDet().fit(scores[:, :3])
mahalanobis_distance = robust_cov.mahalanobis(scores[:, :3]) # getting the top 3 dimensions
pvalue_mahalanobis = 1 - st.chi2.cdf(mahalanobis_distance[-1],
np.array(baseline_execution_data).shape[1])

gamma_alpha, gamma_loc, gamma_beta = st.gamma.fit(baseline_sds)
pvalue_gamma = 1 - st.gamma.cdf(np.std(current_adjusted_data), gamma_alpha, gamma_loc, gamma_beta)
if pvalue_mahalanobis < self.sig_level or pvalue_gamma < self.sig_level:
is_anomaly = True
prob_of_anomaly.append(1 - min(pvalue_mahalanobis, pvalue_gamma))
prob_of_anomaly = np.min(prob_of_anomaly)

return is_anomaly, prob_of_anomaly

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

setup(
name='luminaire',
version='0.2.2',
version='0.2.3',

license='Apache License 2.0',

Expand Down

0 comments on commit 6d66c2a

Please sign in to comment.