diff --git a/luminaire/exploration/data_exploration.py b/luminaire/exploration/data_exploration.py index 3b4192d..4b758fb 100644 --- a/luminaire/exploration/data_exploration.py +++ b/luminaire/exploration/data_exploration.py @@ -875,6 +875,7 @@ def _prepare(self, df, impute_only, streaming=False, **kwargs): freq = self.freq freq_delta = pd.Timedelta("1" + freq) if not any(char.isdigit() for char in str(freq)) else pd.Timedelta(freq) + df.index = pd.DatetimeIndex(df.index) df = self.add_missing_index(df=df, freq=freq_delta) if not streaming: diff --git a/luminaire/model/window_density.py b/luminaire/model/window_density.py index 090e0b3..b809e7c 100644 --- a/luminaire/model/window_density.py +++ b/luminaire/model/window_density.py @@ -505,7 +505,9 @@ def _get_result(self, input_df=None, detrend_order=None, agg_data_model=None, va """ The function scores the scoring window for anomalies based on the training metrics and the baseline :param pandas.DataFrame input_df: Input data containing the training and the scoring data. - :param int detrend_order: The order of detrending based on MA or differencing method. + :param int detrend_order: The non-negative order of detrending based on Modeling or differencing method. When + the detrend_order > 0, corresponding detrending need to be performed using the method specified in the model + config. :param luminaire.model.lad_structural.LADStructuralModel agg_data_model: Prediction model for aggregated data. :param str value_column: Column containing the values. :param str detrend_method: Selects between "modeling" or "diff" detrend method. @@ -532,6 +534,10 @@ def _get_result(self, input_df=None, detrend_order=None, agg_data_model=None, va is_anomaly = False execution_data = input_df[value_column] + adjusted_execution_data = [] + prob_of_anomaly = [] + len_req_agg_data_model = 42 # Setting a hard threshold to have predictions from aggregated data + # for stationarity adjustment if detrend_method == 'diff': # Obtain the execution data and perform the necessary differencing @@ -545,23 +551,49 @@ def _get_result(self, input_df=None, detrend_order=None, agg_data_model=None, va execution_data_avg = np.mean(execution_data) # If detrending is needed, we scale the scoring data accordingly using the agg_dat_model forecast if detrend_order > 0: + snapshot_len_max = min(len(agg_data), len_req_agg_data_model) + agg_data_trunc = np.array(agg_data)[:, 1][-snapshot_len_max:] + data_adjust_forecast = [] try: - data_adjust_forecast = agg_data_model.score(execution_data_avg, scoring_datetime)['Prediction'] \ - if agg_data_model else agg_data[-1][-1] + # Setting the data adjustment window of the original data using the predictions and the CILower and + # CIUpper keeping the prediction uncertainty of the agg_model in mind + if agg_data_model and len(agg_data) > len_req_agg_data_model: + score = agg_data_model.score(execution_data_avg, scoring_datetime) + data_adjust_forecast.append(score['Prediction']) + data_adjust_forecast.append(score['CILower']) + data_adjust_forecast.append(score['CIUpper']) + else: + data_adjust_forecast.append(np.median(agg_data_trunc)) + data_adjust_forecast.append(np.percentile(agg_data_trunc, 5)) # setting a 2-sigma limit + data_adjust_forecast.append(np.percentile(agg_data_trunc, 95)) # setting a 2-sigma limit except: # If the scoring for the agg_data_model fails for some reason, we use the latest agg_data for the # detrending adjustment - data_adjust_forecast = agg_data[-1][-1] - adjusted_execution_data = (execution_data / data_adjust_forecast).tolist() + data_adjust_forecast.append(np.median(agg_data_trunc)) + data_adjust_forecast.append(np.percentile(agg_data_trunc, 5)) # setting a 2-sigma limit + data_adjust_forecast.append(np.percentile(agg_data_trunc, 95)) # setting a 2-sigma limit + for i in range(3): + if data_adjust_forecast[i] != 0: + adjusted_execution_data.append((execution_data / data_adjust_forecast[i]).tolist()) else: adjusted_execution_data = list(execution_data) # Kl divergence based anomaly detection if detection_method == "kldiv": - current_anomaly_score = self._distance_function(data=adjusted_execution_data, - called_for="scoring", baseline=baseline) - prob_of_anomaly = st.gamma.cdf(current_anomaly_score, anomaly_scores_gamma_alpha, - anomaly_scores_gamma_loc, anomaly_scores_gamma_beta) + if detrend_order > 0: + prob_of_anomaly = [] + for i in range(3): + current_anomaly_score = self._distance_function(data=adjusted_execution_data[i], + called_for="scoring", baseline=baseline) + prob_of_anomaly.append(st.gamma.cdf(current_anomaly_score, anomaly_scores_gamma_alpha, + anomaly_scores_gamma_loc, anomaly_scores_gamma_beta)) + prob_of_anomaly = np.min(prob_of_anomaly) + else: + current_anomaly_score = self._distance_function(data=adjusted_execution_data, + called_for="scoring", baseline=baseline) + prob_of_anomaly = st.gamma.cdf(current_anomaly_score, anomaly_scores_gamma_alpha, + anomaly_scores_gamma_loc, anomaly_scores_gamma_beta) + if 1 - prob_of_anomaly < self.sig_level: is_anomaly = True # Sign test based anomaly detection @@ -578,20 +610,25 @@ def _get_result(self, input_df=None, detrend_order=None, agg_data_model=None, va # based test for the past standard deviations to detect anomalies elif baseline_type == "aggregated": baseline_sds = np.array(baseline).std(1).tolist() - baseline_execution_data = copy.copy(baseline) - baseline_execution_data.append(adjusted_execution_data) - pca = PCA() - scores = pca.fit_transform(StandardScaler().fit_transform(baseline_execution_data)) - robust_cov = MinCovDet().fit(scores[:, :3]) - mahalanobis_distance = robust_cov.mahalanobis(scores[:, :3]) - pvalue_mahalanobis = 1 - st.chi2.cdf(mahalanobis_distance[-1], - np.array(baseline_execution_data).shape[1]) - - gamma_alpha, gamma_loc, gamma_beta = st.gamma.fit(baseline_sds) - pvalue_gamma = 1 - st.gamma.cdf(np.std(adjusted_execution_data), gamma_alpha, gamma_loc, gamma_beta) - if pvalue_mahalanobis < self.sig_level or pvalue_gamma < self.sig_level: - is_anomaly = True - prob_of_anomaly = 1 - min(pvalue_mahalanobis, pvalue_gamma) + if detrend_order == 0: + # crearing a 2d list to make it easy to loop through in the following for loop + adjusted_execution_data = [adjusted_execution_data] + for current_adjusted_data in adjusted_execution_data: + baseline_execution_data = copy.copy(baseline) + baseline_execution_data.append(current_adjusted_data) + pca = PCA() + scores = pca.fit_transform(StandardScaler().fit_transform(baseline_execution_data)) + robust_cov = MinCovDet().fit(scores[:, :3]) + mahalanobis_distance = robust_cov.mahalanobis(scores[:, :3]) # getting the top 3 dimensions + pvalue_mahalanobis = 1 - st.chi2.cdf(mahalanobis_distance[-1], + np.array(baseline_execution_data).shape[1]) + + gamma_alpha, gamma_loc, gamma_beta = st.gamma.fit(baseline_sds) + pvalue_gamma = 1 - st.gamma.cdf(np.std(current_adjusted_data), gamma_alpha, gamma_loc, gamma_beta) + if pvalue_mahalanobis < self.sig_level or pvalue_gamma < self.sig_level: + is_anomaly = True + prob_of_anomaly.append(1 - min(pvalue_mahalanobis, pvalue_gamma)) + prob_of_anomaly = np.min(prob_of_anomaly) return is_anomaly, prob_of_anomaly diff --git a/setup.py b/setup.py index e233451..9bacd46 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ setup( name='luminaire', - version='0.2.2', + version='0.2.3', license='Apache License 2.0',