From 9c4963ceb254b5f052d387cef4397c9071269140 Mon Sep 17 00:00:00 2001 From: Maisa Ben Salah <76703998+MaiBe-ctrl@users.noreply.github.com> Date: Tue, 3 Sep 2024 20:16:34 +0200 Subject: [PATCH] [Major] Speedup dataset get_item (#1636) * updated dataset get_item * fixed linting issues * make targets contiguous * fixed ruff warnings * Unpack incrementally when needed * adjust forecaster * separate unpacking logic * added featureExtractor class * separate packing logic * fixed liniting issues * fixed covariates * added features extractor * rename classes and functions * remove prints in time_net * init Stacker in forecaster * fix time-dataset * remove last _create component staker * fix lagged config * uncomment glocal tests * fix ruff * uncomment future reg tests * comment test --------- Co-authored-by: ourownstory --- .../global_local_modeling_fut_regr.ipynb | 2 +- .../feature-guides/global_local_trend.ipynb | 2 +- .../network_architecture_visualization.ipynb | 4 +- .../prophet_to_torch_prophet.ipynb | 3 +- neuralprophet/configure.py | 5 + neuralprophet/data/process.py | 4 +- neuralprophet/forecaster.py | 102 +++++- neuralprophet/time_dataset.py | 340 +++-------------- neuralprophet/time_net.py | 344 ++++++++++-------- neuralprophet/utils_time_dataset.py | 306 ++++++++++++++++ tests/test_future_regressor_nn.py | 4 +- tests/test_train_config.py | 10 +- tests/test_unit.py | 72 +++- 13 files changed, 718 insertions(+), 480 deletions(-) create mode 100644 neuralprophet/utils_time_dataset.py diff --git a/docs/source/how-to-guides/feature-guides/global_local_modeling_fut_regr.ipynb b/docs/source/how-to-guides/feature-guides/global_local_modeling_fut_regr.ipynb index 6f57f1fd6..02cf34719 100644 --- a/docs/source/how-to-guides/feature-guides/global_local_modeling_fut_regr.ipynb +++ b/docs/source/how-to-guides/feature-guides/global_local_modeling_fut_regr.ipynb @@ -751,7 +751,7 @@ "try:\n", " # it already installed dependencies\n", " from torchviz import make_dot\n", - "except:\n", + "except ImportError:\n", " # install graphviz on system\n", " import platform\n", "\n", diff --git a/docs/source/how-to-guides/feature-guides/global_local_trend.ipynb b/docs/source/how-to-guides/feature-guides/global_local_trend.ipynb index 674f75b1b..ca7fd3eff 100644 --- a/docs/source/how-to-guides/feature-guides/global_local_trend.ipynb +++ b/docs/source/how-to-guides/feature-guides/global_local_trend.ipynb @@ -1309,7 +1309,7 @@ "try:\n", " # it already installed dependencies\n", " from torchviz import make_dot\n", - "except:\n", + "except ImportError:\n", " # install graphviz on system\n", " import platform\n", "\n", diff --git a/docs/source/how-to-guides/feature-guides/network_architecture_visualization.ipynb b/docs/source/how-to-guides/feature-guides/network_architecture_visualization.ipynb index 70d99ce3f..65afb79c5 100644 --- a/docs/source/how-to-guides/feature-guides/network_architecture_visualization.ipynb +++ b/docs/source/how-to-guides/feature-guides/network_architecture_visualization.ipynb @@ -41,7 +41,7 @@ " # it already installed dependencies\n", " from torchsummary import summary\n", " from torchviz import make_dot\n", - "except:\n", + "except ImportError:\n", " # install graphviz on system\n", " import platform\n", "\n", @@ -69,7 +69,7 @@ "source": [ "try:\n", " from neuralprophet import NeuralProphet\n", - "except:\n", + "except ImportError:\n", " # if NeuralProphet is not installed yet:\n", " !pip install git+https://github.com/ourownstory/neural_prophet.git\n", " from neuralprophet import NeuralProphet" diff --git a/docs/source/how-to-guides/feature-guides/prophet_to_torch_prophet.ipynb b/docs/source/how-to-guides/feature-guides/prophet_to_torch_prophet.ipynb index 20827a015..beb9ccda8 100644 --- a/docs/source/how-to-guides/feature-guides/prophet_to_torch_prophet.ipynb +++ b/docs/source/how-to-guides/feature-guides/prophet_to_torch_prophet.ipynb @@ -240,11 +240,12 @@ "# Set loggers to ERROR level\n", "import logging\n", "import warnings\n", + "from neuralprophet import set_log_level\n", + "\n", "\n", "logging.getLogger(\"prophet\").setLevel(logging.ERROR)\n", "warnings.filterwarnings(\"ignore\")\n", "\n", - "from neuralprophet import set_log_level\n", "\n", "set_log_level(\"ERROR\")" ] diff --git a/neuralprophet/configure.py b/neuralprophet/configure.py index bfd7e5284..0c2a13b52 100644 --- a/neuralprophet/configure.py +++ b/neuralprophet/configure.py @@ -22,6 +22,8 @@ @dataclass class Model: + features_map: dict + lagged_reg_layers: Optional[List[int]] quantiles: Optional[List[float]] = None def setup_quantiles(self): @@ -42,6 +44,9 @@ def setup_quantiles(self): self.quantiles.insert(0, 0.5) +ConfigModel = Model + + @dataclass class Normalization: normalize: str diff --git a/neuralprophet/data/process.py b/neuralprophet/data/process.py index ba2fb6d0e..46e63a67b 100644 --- a/neuralprophet/data/process.py +++ b/neuralprophet/data/process.py @@ -575,7 +575,7 @@ def _handle_missing_data( return df -def _create_dataset(model, df, predict_mode, prediction_frequency=None): +def _create_dataset(model, df, predict_mode, prediction_frequency=None, components_stacker=None): """Construct dataset from dataframe. (Configured Hyperparameters can be overridden by explicitly supplying them. @@ -626,5 +626,7 @@ def _create_dataset(model, df, predict_mode, prediction_frequency=None): config_regressors=model.config_regressors, config_lagged_regressors=model.config_lagged_regressors, config_missing=model.config_missing, + config_model=model.config_model, + components_stacker=components_stacker, # config_train=model.config_train, # no longer needed since JIT tabularization. ) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index fda36057d..a9fcecc75 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -13,7 +13,17 @@ from matplotlib.axes import Axes from torch.utils.data import DataLoader -from neuralprophet import configure, df_utils, np_types, time_dataset, time_net, utils, utils_lightning, utils_metrics +from neuralprophet import ( + configure, + df_utils, + np_types, + time_dataset, + time_net, + utils, + utils_lightning, + utils_metrics, + utils_time_dataset, +) from neuralprophet.data.process import ( _check_dataframe, _convert_raw_predictions_to_raw_df, @@ -34,6 +44,7 @@ from neuralprophet.plot_model_parameters_plotly import plot_parameters as plot_parameters_plotly from neuralprophet.plot_utils import get_valid_configuration, log_warning_deprecation_plotly, select_plotting_backend from neuralprophet.uncertainty import Conformal +from neuralprophet.utils_time_dataset import ComponentStacker log = logging.getLogger("NP.forecaster") @@ -503,6 +514,8 @@ def __init__( # Model self.config_model = configure.Model( + features_map={}, + lagged_reg_layers=lagged_reg_layers, quantiles=quantiles, ) self.config_model.setup_quantiles() @@ -1067,7 +1080,7 @@ def fit( or any(value != 1 for value in self.num_seasonalities_modelled_dict.values()) ) - ##### Data Setup, and Training Setup ##### + # Data Setup, and Training Setup # Train Configuration: overwrite self.config_train with user provided values if learning_rate is not None: self.config_train.learning_rate = learning_rate @@ -1152,7 +1165,22 @@ def fit( # Set up DataLoaders: Train # Create TimeDataset # Note: _create_dataset() needs to be called after set_auto_seasonalities() - dataset = _create_dataset(self, df, predict_mode=False, prediction_frequency=self.prediction_frequency) + train_components_stacker = utils_time_dataset.ComponentStacker( + n_lags=self.n_lags, + n_forecasts=self.n_forecasts, + max_lags=self.max_lags, + config_seasonality=self.config_seasonality, + lagged_regressor_config=self.config_lagged_regressors, + feature_indices={}, + ) + + dataset = _create_dataset( + self, + df, + predict_mode=False, + prediction_frequency=self.prediction_frequency, + components_stacker=train_components_stacker, + ) # Determine the max_number of epochs self.config_train.set_auto_batch_epoch(n_data=len(dataset)) # Create Train DataLoader @@ -1162,6 +1190,7 @@ def fit( shuffle=True, num_workers=num_workers, ) + self.config_train.set_batches_per_epoch(len(loader)) log.info(f"Train Dataset size: {len(dataset)}") log.info(f"Number of batches per training epoch: {len(loader)}") @@ -1186,7 +1215,15 @@ def fit( ) # df_val, _, _, _ = df_utils.prep_or_copy_df(df_val) df_val = _normalize(df=df_val, config_normalization=self.config_normalization) - dataset_val = _create_dataset(self, df_val, predict_mode=False) + val_components_stacker = utils_time_dataset.ComponentStacker( + n_lags=self.n_lags, + max_lags=self.max_lags, + n_forecasts=self.n_forecasts, + config_seasonality=self.config_seasonality, + lagged_regressor_config=self.config_lagged_regressors, + feature_indices={}, + ) + dataset_val = _create_dataset(self, df_val, predict_mode=False, components_stacker=val_components_stacker) loader_val = DataLoader(dataset_val, batch_size=min(1024, len(dataset_val)), shuffle=False, drop_last=False) # Init the Trainer @@ -1206,12 +1243,16 @@ def fit( if not self.fitted: self.model = self._init_model() + self.model.set_components_stacker(components_stacker=train_components_stacker, mode="train") + if validation_enabled: + self.model.set_components_stacker(components_stacker=val_components_stacker, mode="val") + # Find suitable learning rate if not set if self.config_train.learning_rate is None: assert not self.fitted, "Learning rate must be provided for re-training a fitted model." - ## Init a separate Model, Loader and Trainer copy for LR finder (optional, done for safety) - ## Note Leads to a CUDA issue. Needs to be fixed before enabling this feature. + # Init a separate Model, Loader and Trainer copy for LR finder (optional, done for safety) + # Note Leads to a CUDA issue. Needs to be fixed before enabling this feature. # model_lr_finder = self._init_model() # loader_lr_finder = DataLoader( # dataset, @@ -1414,7 +1455,16 @@ def test(self, df: pd.DataFrame, verbose: bool = True): ) df, _, _, _ = df_utils.prep_or_copy_df(df) df = _normalize(df=df, config_normalization=self.config_normalization) - dataset = _create_dataset(self, df, predict_mode=False) + components_stacker = utils_time_dataset.ComponentStacker( + n_lags=self.n_lags, + n_forecasts=self.n_forecasts, + max_lags=self.max_lags, + config_seasonality=self.config_seasonality, + lagged_regressor_config=self.config_lagged_regressors, + feature_indices={}, + ) + dataset = _create_dataset(self, df, predict_mode=False, components_stacker=components_stacker) + self.model.set_components_stacker(components_stacker, mode="test") test_loader = DataLoader(dataset, batch_size=min(1024, len(dataset)), shuffle=False, drop_last=False) # Use Lightning to calculate metrics val_metrics = self.trainer.test(self.model, dataloaders=test_loader, verbose=verbose) @@ -2047,6 +2097,13 @@ def predict_seasonal_components(self, df: pd.DataFrame, quantile: float = 0.5): df = _normalize(df=df, config_normalization=self.config_normalization) df_seasonal = pd.DataFrame() for df_name, df_i in df.groupby("ID"): + feature_unstackor = ComponentStacker( + n_lags=0, + max_lags=0, + n_forecasts=1, + config_seasonality=self.config_seasonality, + lagged_regressor_config=self.config_lagged_regressors, + ) dataset = time_dataset.TimeDataset( df=df_i, predict_mode=True, @@ -2060,25 +2117,29 @@ def predict_seasonal_components(self, df: pd.DataFrame, quantile: float = 0.5): config_regressors=self.config_regressors, config_lagged_regressors=self.config_lagged_regressors, config_missing=self.config_missing, + config_model=self.config_model, + components_stacker=feature_unstackor, # config_train=self.config_train, # no longer needed since JIT tabularization. ) + self.model.set_components_stacker(feature_unstackor, mode="predict") loader = DataLoader(dataset, batch_size=min(4096, len(df)), shuffle=False, drop_last=False) predicted = {} for name in self.config_seasonality.periods: predicted[name] = list() - for inputs, _, meta in loader: + for inputs_tensor, meta in loader: # Meta as a tensor for prediction if self.model.config_seasonality is None: meta_name_tensor = None elif self.model.config_seasonality.global_local in ["local", "glocal"]: meta = OrderedDict() - meta["df_name"] = [df_name for _ in range(inputs["time"].shape[0])] + time_input = feature_unstackor.unstack_component("time", inputs_tensor) + meta["df_name"] = [df_name for _ in range(time_input.shape[0])] meta_name_tensor = torch.tensor([self.model.id_dict[i] for i in meta["df_name"]]) # type: ignore else: meta_name_tensor = None - + seasonalities_input = feature_unstackor.unstack_component("seasonalities", inputs_tensor) for name in self.config_seasonality.periods: - features = inputs["seasonalities"][name] + features = seasonalities_input[name] quantile_index = self.config_model.quantiles.index(quantile) y_season = torch.squeeze( self.model.seasonality.compute_fourier(features=features, name=name, meta=meta_name_tensor)[ @@ -2880,7 +2941,22 @@ def _predict_raw(self, df, df_name, include_components=False, prediction_frequen assert len(df["ID"].unique()) == 1 if "y_scaled" not in df.columns or "t" not in df.columns: raise ValueError("Received unprepared dataframe to predict. " "Please call predict_dataframe_to_predict.") - dataset = _create_dataset(self, df, predict_mode=True, prediction_frequency=prediction_frequency) + components_stacker = utils_time_dataset.ComponentStacker( + n_lags=self.n_lags, + n_forecasts=self.n_forecasts, + max_lags=self.max_lags, + config_seasonality=self.config_seasonality, + lagged_regressor_config=self.config_lagged_regressors, + feature_indices={}, + ) + dataset = _create_dataset( + self, + df, + predict_mode=True, + prediction_frequency=prediction_frequency, + components_stacker=components_stacker, + ) + self.model.set_components_stacker(components_stacker, mode="predict") loader = DataLoader(dataset, batch_size=min(1024, len(df)), shuffle=False, drop_last=False) if self.n_forecasts > 1: dates = df["ds"].iloc[self.max_lags : -self.n_forecasts + 1] @@ -2893,7 +2969,7 @@ def _predict_raw(self, df, df_name, include_components=False, prediction_frequen self.model.set_covar_weights(self.model.get_covar_weights()) # Compute the predictions and components (if requested) result = self.trainer.predict(self.model, loader) - # Extract the prediction and components + # unstack the prediction and components predicted, component_vectors = zip(*result) predicted = np.concatenate(predicted) diff --git a/neuralprophet/time_dataset.py b/neuralprophet/time_dataset.py index 19886e3e4..39364a5b2 100644 --- a/neuralprophet/time_dataset.py +++ b/neuralprophet/time_dataset.py @@ -1,7 +1,7 @@ import logging from collections import OrderedDict from datetime import datetime -from typing import List, Optional +from typing import Optional import numpy as np import pandas as pd @@ -33,6 +33,8 @@ def __init__( config_regressors, config_lagged_regressors, config_missing, + config_model, + components_stacker, ): """Initialize Timedataset from time-series df. Parameters @@ -75,6 +77,7 @@ def __init__( self.config_regressors = config_regressors self.config_lagged_regressors = config_lagged_regressors self.config_missing = config_missing + self.config_model = config_model self.max_lags = get_max_num_lags(n_lags=self.n_lags, config_lagged_regressors=self.config_lagged_regressors) if self.max_lags == 0: @@ -109,29 +112,54 @@ def __init__( self.df["ds"] = self.df["ds"].apply(lambda x: x.timestamp()) # Convert to Unix timestamp in seconds self.df_tensors["ds"] = torch.tensor(self.df["ds"].values, dtype=torch.int64) - if self.additive_event_and_holiday_names: - self.df_tensors["additive_event_and_holiday"] = torch.stack( - [self.df_tensors[name] for name in self.additive_event_and_holiday_names], dim=1 - ) - if self.multiplicative_event_and_holiday_names: - self.df_tensors["multiplicative_event_and_holiday"] = torch.stack( - [self.df_tensors[name] for name in self.multiplicative_event_and_holiday_names], dim=1 - ) - - if self.additive_regressors_names: - self.df_tensors["additive_regressors"] = torch.stack( - [self.df_tensors[name] for name in self.additive_regressors_names], dim=1 - ) - if self.multiplicative_regressors_names: - self.df_tensors["multiplicative_regressors"] = torch.stack( - [self.df_tensors[name] for name in self.multiplicative_regressors_names], dim=1 - ) + if self.config_seasonality is not None and hasattr(self.config_seasonality, "periods"): + self.calculate_seasonalities() # Construct index map self.sample2index_map, self.length = self.create_sample2index_map(self.df, self.df_tensors) + self.components_stacker = components_stacker + + self.stack_all_features() + + def stack_all_features(self): + """ + Stack all features into one large tensor by calling individual stacking methods. + """ + feature_list = [] + + current_idx = 0 + + # Call individual stacking functions + current_idx = self.components_stacker.stack_trend_component(self.df_tensors, feature_list, current_idx) + current_idx = self.components_stacker.stack_targets_component(self.df_tensors, feature_list, current_idx) + + current_idx = self.components_stacker.stack_lags_component( + self.df_tensors, feature_list, current_idx, self.n_lags + ) + current_idx = self.components_stacker.stack_lagged_regerssors_component( + self.df_tensors, feature_list, current_idx, self.config_lagged_regressors + ) + current_idx = self.components_stacker.stack_additive_events_component( + self.df_tensors, feature_list, current_idx, self.additive_event_and_holiday_names + ) + current_idx = self.components_stacker.stack_multiplicative_events_component( + self.df_tensors, feature_list, current_idx, self.multiplicative_event_and_holiday_names + ) + current_idx = self.components_stacker.stack_additive_regressors_component( + self.df_tensors, feature_list, current_idx, self.additive_regressors_names + ) + current_idx = self.components_stacker.stack_multiplicative_regressors_component( + self.df_tensors, feature_list, current_idx, self.multiplicative_regressors_names + ) + if self.config_seasonality is not None and hasattr(self.config_seasonality, "periods"): - self.calculate_seasonalities() + current_idx = self.components_stacker.stack_seasonalities_component( + feature_list, current_idx, self.config_seasonality, self.seasonalities + ) + + # Concatenate all features into one big tensor + self.all_features = torch.cat(feature_list, dim=1) # Concatenating along the second dimension def calculate_seasonalities(self): self.seasonalities = OrderedDict({}) @@ -153,21 +181,6 @@ def compute_fourier_features(t, period): features *= condition_values self.seasonalities[name] = features - def get_sample_seasonalities(self, df_tensors, origin_index, n_forecasts, max_lags, n_lags, config_seasonality): - seasonalities = OrderedDict({}) - - # Determine the range of indices based on whether lags are used - if max_lags == 0: - indices = [origin_index] - else: - indices = list(range(origin_index - n_lags + 1, origin_index + n_forecasts + 1)) - - # Extract the precomputed seasonalities from self.seasonalities - for name, features in self.seasonalities.items(): - seasonalities[name] = features[indices, :] - - return seasonalities - def __getitem__(self, index): """Overrides parent class method to get an item at index. Parameters @@ -202,22 +215,15 @@ def __getitem__(self, index): # - dataframe positional index is given by position of first target in dataframe for given sample index df_index = self.sample_index_to_df_index(index) - # Tabularize - extract features from dataframe at given target index position - inputs, target = self.tabularize_univariate_datetime_single_index( - df_tensors=self.df_tensors, - origin_index=df_index, - predict_mode=self.predict_mode, - n_lags=self.n_lags, - max_lags=self.max_lags, - n_forecasts=self.n_forecasts, - config_seasonality=self.config_seasonality, - config_lagged_regressors=self.config_lagged_regressors, - additive_event_and_holiday_names=self.additive_event_and_holiday_names, - multiplicative_event_and_holiday_names=self.multiplicative_event_and_holiday_names, - additive_regressors_names=self.additive_regressors_names, - multiplicative_regressors_names=self.multiplicative_regressors_names, - ) - return inputs, target, self.meta + # Extract features from dataframe at given target index position + if self.max_lags > 0: + min_start_index = df_index - self.max_lags + 1 + max_end_index = df_index + self.n_forecasts + 1 + inputs = self.all_features[min_start_index:max_end_index, :] + else: + inputs = self.all_features[df_index, :] + + return inputs, self.meta def __len__(self): """Overrides Parent class method to get data length.""" @@ -294,154 +300,6 @@ def log_input_shapes(self, inputs): tabularized_input_shapes_str += f" {key} {value.shape} \n" log.debug(f"Tabularized inputs shapes: \n{tabularized_input_shapes_str}") - def tabularize_univariate_datetime_single_index( - self, - df_tensors: dict, - origin_index: int, - predict_mode: bool = False, - n_lags: int = 0, - max_lags: int = 0, - n_forecasts: int = 1, - config_seasonality: Optional[configure.ConfigSeasonality] = None, - config_lagged_regressors: Optional[configure.ConfigLaggedRegressors] = None, - additive_event_and_holiday_names: List[str] = [], - multiplicative_event_and_holiday_names: List[str] = [], - additive_regressors_names: List[str] = [], - multiplicative_regressors_names: List[str] = [], - ): - """Create a tabular data sample from timeseries dataframe, used for mini-batch creation. - Note - ---- - Data must have no gaps for sample extracted at given index position. - ---------- - df : pd.DataFrame - Sequence of observations with original ``ds``, ``y`` and normalized ``t``, ``y_scaled`` columns - origin_index: int: - dataframe index position of last observed lag before forecast starts. - n_forecasts : int - Number of steps to forecast into future - n_lags : int - Number of lagged values of series to include as model inputs (aka AR-order) - config_seasonality : configure.ConfigSeasonality - Configuration for seasonalities - config_lagged_regressors : configure.ConfigLaggedRegressors - Configurations for lagged regressors - config_events : configure.ConfigEvents - User specified events, each with their upper, lower windows (int) and regularization - config_country_holidays : configure.ConfigCountryHolidays - Configurations (holiday_names, upper, lower windows, regularization) for country specific holidays - config_regressors : configure.ConfigFutureRegressors - Configuration for regressors - predict_mode : bool - Chooses the prediction mode - Options - * (default) ``False``: Includes target values - * ``True``: Does not include targets but includes entire dataset as input - Returns - ------- - OrderedDict - Model inputs, each of len(df) but with varying dimensions - Note - ---- - Contains the following data: - Model Inputs - * ``time`` (np.array, float), dims: (num_samples, 1) - * ``seasonalities`` (OrderedDict), named seasonalities - each with features (np.array, float) - dims: (num_samples, n_features[name]) - * ``lags`` (np.array, float), dims: (num_samples, n_lags) - * ``covariates`` (OrderedDict), named covariates, - each with features (np.array, float) of dims: (num_samples, n_lags) - * ``events`` (OrderedDict), events, - each with features (np.array, float) of dims: (num_samples, n_lags) - * ``regressors`` (OrderedDict), regressors, - each with features (np.array, float) of dims: (num_samples, n_lags) - np.array, float - Targets to be predicted of same length as each of the model inputs, dims: (n_forecasts, 1) - """ - # TODO: pre-process all type conversions (e.g. torch.float32) in __init__ - # Note: if max_lags == 0, then n_forecasts == 1 - - # sample features are stored and returned in OrderedDict - inputs = OrderedDict({}) - - targets = self.get_sample_targets( - df_tensors=df_tensors, - origin_index=origin_index, - n_forecasts=n_forecasts, - max_lags=max_lags, - predict_mode=predict_mode, - ) - - # TIME: the time at each sample's lags and forecasts - if max_lags == 0: - t = df_tensors["t"][origin_index] - inputs["time"] = t.unsqueeze(0) - else: - # extract time value of n_lags steps before and icluding origin_index and n_forecasts steps after origin_index - # Note: df.loc is inclusive of slice end, while df.iloc is not. - t = df_tensors["t"][origin_index - n_lags + 1 : origin_index + n_forecasts + 1] - inputs["time"] = t - - # LAGS: From y-series, extract preceeding n_lags steps up to and including origin_index - if n_lags >= 1 and "y_scaled" in df_tensors: - # Note: df.loc is inclusive of slice end, while df.iloc is not. - lags = df_tensors["y_scaled"][origin_index - n_lags + 1 : origin_index + 1] - inputs["lags"] = lags - - # COVARIATES / LAGGED REGRESSORS: Lagged regressor inputs: analogous to LAGS - if ( - config_lagged_regressors is not None and config_lagged_regressors.regressors is not None - ): # and max_lags > 0: - inputs["covariates"] = self.get_sample_lagged_regressors( - df_tensors=df_tensors, origin_index=origin_index, config_lagged_regressors=config_lagged_regressors - ) - - # SEASONALITIES_ - if config_seasonality is not None: - inputs["seasonalities"] = self.get_sample_seasonalities( - df_tensors=df_tensors, - origin_index=origin_index, - n_forecasts=n_forecasts, - max_lags=max_lags, - n_lags=n_lags, - config_seasonality=config_seasonality, - ) - - # FUTURE REGRESSORS: get the future regressors features - # create numpy array of values of additive and multiplicative regressors, at correct indexes - # features dims: (n_forecasts, n_features) - any_future_regressors = 0 < len(additive_regressors_names + multiplicative_regressors_names) - if any_future_regressors: # if config_regressors.regressors is not None: - inputs["regressors"] = self.get_sample_future_regressors( - df_tensors=df_tensors, - origin_index=origin_index, - n_forecasts=n_forecasts, - max_lags=max_lags, - n_lags=n_lags, - additive_regressors_names=additive_regressors_names, - multiplicative_regressors_names=multiplicative_regressors_names, - ) - - # FUTURE EVENTS: get the events features - # create numpy array of values of additive and multiplicative events, at correct indexes - # features dims: (n_forecasts, n_features) - any_events = 0 < len(additive_event_and_holiday_names + multiplicative_event_and_holiday_names) - if any_events: - inputs["events"] = self.get_sample_future_events( - df_tensors=df_tensors, - origin_index=origin_index, - n_forecasts=n_forecasts, - max_lags=max_lags, - n_lags=n_lags, - additive_event_and_holiday_names=additive_event_and_holiday_names, - multiplicative_event_and_holiday_names=multiplicative_event_and_holiday_names, - ) - - # ONLY FOR DEBUGGING - # if log.level == 0: - # log_input_shapes(inputs) - return inputs, targets - def get_event_offset_features(self, event, config, feature): """ Create event offset features for the given event, config and feature @@ -710,88 +568,6 @@ def sort_regressor_names(self, config): multiplicative_regressors_names.append(reg) return additive_regressors_names, multiplicative_regressors_names - def get_sample_targets(self, df_tensors, origin_index, n_forecasts, max_lags, predict_mode): - if predict_mode: - return torch.zeros((n_forecasts, 1), dtype=torch.float32) - else: - if n_forecasts == 1: - if max_lags == 0: - targets = df_tensors["y_scaled"][origin_index] - if max_lags > 0: - targets = df_tensors["y_scaled"][origin_index + 1] - targets = targets.unsqueeze(0).unsqueeze(1) - else: - targets = df_tensors["y_scaled"][origin_index + 1 : origin_index + n_forecasts + 1] - targets = targets.unsqueeze(1) - return targets - - def get_sample_lagged_regressors(self, df_tensors, origin_index, config_lagged_regressors): - lagged_regressors = OrderedDict({}) - # Future TODO: optimize this computation for many lagged_regressors - for name, lagged_regressor in config_lagged_regressors.regressors.items(): - covar_lags = lagged_regressor.n_lags - assert covar_lags > 0 - # Indexing tensors instead of DataFrame - lagged_regressors[name] = df_tensors[name][origin_index - covar_lags + 1 : origin_index + 1] - return lagged_regressors - - def get_sample_future_regressors( - self, - df_tensors, - origin_index, - n_forecasts, - max_lags, - n_lags, - additive_regressors_names, - multiplicative_regressors_names, - ): - regressors = OrderedDict({}) - if max_lags == 0: - if additive_regressors_names: - regressors["additive"] = df_tensors["additive_regressors"][origin_index, :].unsqueeze(0) - - if multiplicative_regressors_names: - regressors["multiplicative"] = df_tensors["multiplicative_regressors"][origin_index, :].unsqueeze(0) - - else: - if additive_regressors_names: - regressors["additive"] = df_tensors["additive_regressors"][ - origin_index + 1 - n_lags : origin_index + n_forecasts + 1, : - ] - if multiplicative_regressors_names: - regressors["multiplicative"] = df_tensors["multiplicative_regressors"][ - origin_index + 1 - n_lags : origin_index + n_forecasts + 1, : - ] - - return regressors - - def get_sample_future_events( - self, - df_tensors, - origin_index, - n_forecasts, - max_lags, - n_lags, - additive_event_and_holiday_names, - multiplicative_event_and_holiday_names, - ): - events = OrderedDict({}) - if max_lags == 0: - if additive_event_and_holiday_names: - events["additive"] = df_tensors["additive_event_and_holiday"][origin_index, :].unsqueeze(0) - if multiplicative_event_and_holiday_names: - events["multiplicative"] = df_tensors["multiplicative_event_and_holiday"][origin_index, :].unsqueeze(0) - else: - if additive_event_and_holiday_names: - events["additive"] = df_tensors["additive_event_and_holiday"][ - origin_index + 1 - n_lags : origin_index + n_forecasts + 1, : - ] - if multiplicative_event_and_holiday_names: - events["multiplicative"] = df_tensors["multiplicative_event_and_holiday"][ - origin_index + 1 - n_lags : origin_index + n_forecasts + 1, : - ] - return events - class GlobalTimeDataset(TimeDataset): def __init__( @@ -808,6 +584,8 @@ def __init__( config_regressors, config_lagged_regressors, config_missing, + config_model, + components_stacker, ): """Initialize Timedataset from time-series df. Parameters @@ -833,6 +611,8 @@ def __init__( config_regressors=config_regressors, config_lagged_regressors=config_lagged_regressors, config_missing=config_missing, + config_model=config_model, + components_stacker=components_stacker, ) self.length = sum(dataset.length for (name, dataset) in self.datasets.items()) global_sample_to_local_ID = [] diff --git a/neuralprophet/time_net.py b/neuralprophet/time_net.py index 422ea4f62..336d9bf76 100644 --- a/neuralprophet/time_net.py +++ b/neuralprophet/time_net.py @@ -21,6 +21,7 @@ reg_func_trend, reg_func_trend_glocal, ) +from neuralprophet.utils_time_dataset import ComponentStacker from neuralprophet.utils_torch import init_parameter, interprete_model log = logging.getLogger("NP.time_net") @@ -62,6 +63,10 @@ def __init__( num_seasonalities_modelled: int = 1, num_seasonalities_modelled_dict: dict = None, meta_used_in_model: bool = False, + train_components_stacker: Optional[ComponentStacker] = None, + val_components_stacker: Optional[ComponentStacker] = None, + test_components_stacker: Optional[ComponentStacker] = None, + predict_components_stacker: Optional[ComponentStacker] = None, ): """ Parameters @@ -142,11 +147,16 @@ def __init__( # General self.config_model = config_model self.n_forecasts = n_forecasts + self.train_components_stacker = train_components_stacker + self.val_components_stacker = val_components_stacker + self.test_components_stacker = test_components_stacker + self.predict_components_stacker = predict_components_stacker # Lightning Config self.config_train = config_train self.config_normalization = config_normalization self.compute_components_flag = compute_components_flag + self.config_model = config_model # Manual optimization: we are responsible for calling .backward(), .step(), .zero_grad(). self.automatic_optimization = False @@ -310,6 +320,16 @@ def ar_weights(self) -> torch.Tensor: if isinstance(layer, nn.Linear): return layer.weight + def set_components_stacker(self, components_stacker, mode): + if mode == "train": + self.train_components_stacker = components_stacker + if mode == "val": + self.val_components_stacker = components_stacker + if mode == "test": + self.test_components_stacker = components_stacker + if mode == "predict": + self.predict_components_stacker = components_stacker + def get_covar_weights(self, covar_input=None) -> torch.Tensor: """ Get attributions of covariates network w.r.t. the model input. @@ -500,192 +520,174 @@ def forward_covar_net(self, covariates): x = x.view(x.shape[0], self.n_forecasts, len(self.quantiles)) return x - def forward(self, inputs: Dict, meta: Dict = None, compute_components_flag: bool = False) -> torch.Tensor: - """This method defines the model forward pass. - Note - ---- - Time input is required. Minimum model setup is a linear trend. - Parameters - ---------- - inputs : dict - Model inputs, each of len(df) but with varying dimensions - Note - ---- - Contains the following data: - Model Inputs - * ``time`` (torch.Tensor , loat), normalized time, dims: (batch, n_forecasts) - * ``lags`` (torch.Tensor, float), dims: (batch, n_lags) - * ``seasonalities`` (torch.Tensor, float), dict of named seasonalities (keys) with their features - (values), dims of each dict value (batch, n_forecasts, n_features) - * ``covariates`` (torch.Tensor, float), dict of named covariates (keys) with their features - (values), dims of each dict value: (batch, n_lags) - * ``events`` (torch.Tensor, float), all event features, dims (batch, n_forecasts, n_features) - * ``regressors``(torch.Tensor, float), all regressor features, dims (batch, n_forecasts, n_features) - * ``predict_mode`` (bool), optional and only passed during prediction - meta : dict, default=None - Metadata about the all the samples of the model input batch. - Contains the following: - Model Meta: - * ``df_name`` (list, str), time series ID corresponding to each sample of the input batch. - Note - ---- - The meta is sorted in the same way the inputs are sorted. - Note - ---- - The default None value allows the forward method to be used without providing the meta argument. - This was designed to avoid issues with the library `lr_finder` https://github.com/davidtvs/pytorch-lr-finder - while having ``config_trend.trend_global_local="local"``. - The turnaround consists on passing the same meta (dummy ID) to all the samples of the batch. - Internally, this is equivalent to use ``config_trend.trend_global_local="global"`` to find the optimal - learning rate. - compute_components_flag : bool, default=False - If True, components will be computed. + def forward( + self, + input_tensor: torch.Tensor, + components_stacker=ComponentStacker, + meta: Dict = None, + compute_components_flag: bool = False, + predict_mode: bool = False, + ) -> torch.Tensor: + """This method defines the model forward pass.""" - Returns - ------- - torch.Tensor - Forecast of dims (batch, n_forecasts, no_quantiles) - """ - # Turnaround to avoid issues when the meta argument is None and meta_used_in_model + time_input = components_stacker.unstack_component(component_name="time", batch_tensor=input_tensor) + # Handle meta argument if meta is None and self.meta_used_in_model: name_id_dummy = self.id_list[0] meta = OrderedDict() - meta["df_name"] = [name_id_dummy for _ in range(inputs["time"].shape[0])] + meta["df_name"] = [name_id_dummy for _ in range(time_input.shape[0])] meta = torch.tensor([self.id_dict[i] for i in meta["df_name"]], device=self.device) + # Initialize components and nonstationary tensors components = {} additive_components = torch.zeros( - size=(inputs["time"].shape[0], self.n_forecasts, len(self.quantiles)), + size=(time_input.shape[0], self.n_forecasts, len(self.quantiles)), device=self.device, ) additive_components_nonstationary = torch.zeros( - size=(inputs["time"].shape[0], inputs["time"].shape[1], len(self.quantiles)), device=self.device + size=(time_input.shape[0], time_input.shape[1], len(self.quantiles)), + device=self.device, ) multiplicative_components_nonstationary = torch.zeros( - size=(inputs["time"].shape[0], inputs["time"].shape[1], len(self.quantiles)), device=self.device + size=(time_input.shape[0], time_input.shape[1], len(self.quantiles)), + device=self.device, ) - trend = self.trend(t=inputs["time"], meta=meta) + # Unpack time feature and compute trend + trend = self.trend(t=time_input, meta=meta) components["trend"] = trend - if "seasonalities" in inputs: - s = self.seasonality(s=inputs["seasonalities"], meta=meta) + # Unpack and process seasonalities + seasonalities_input = None + if self.config_seasonality and self.config_seasonality.periods: + seasonalities_input = components_stacker.unstack_component( + component_name="seasonalities", batch_tensor=input_tensor + ) + s = self.seasonality(s=seasonalities_input, meta=meta) if self.config_seasonality.mode == "additive": additive_components_nonstationary += s elif self.config_seasonality.mode == "multiplicative": multiplicative_components_nonstationary += s components["seasonalities"] = s - if "events" in inputs: - if "additive" in inputs["events"].keys(): - additive_events = self.scalar_features_effects( - inputs["events"]["additive"], self.event_params["additive"] + # Unpack and process events + additive_events_input = None + multiplicative_events_input = None + if self.events_dims is not None: + if "additive_events" in components_stacker.feature_indices: + additive_events_input = components_stacker.unstack_component( + component_name="additive_events", batch_tensor=input_tensor ) + additive_events = self.scalar_features_effects(additive_events_input, self.event_params["additive"]) additive_components_nonstationary += additive_events components["additive_events"] = additive_events - if "multiplicative" in inputs["events"].keys(): + if "multiplicative_events" in components_stacker.feature_indices: + multiplicative_events_input = components_stacker.unstack_component( + component_name="multiplicative_events", batch_tensor=input_tensor + ) multiplicative_events = self.scalar_features_effects( - inputs["events"]["multiplicative"], self.event_params["multiplicative"] + multiplicative_events_input, self.event_params["multiplicative"] ) multiplicative_components_nonstationary += multiplicative_events components["multiplicative_events"] = multiplicative_events - if "regressors" in inputs: - if "additive" in inputs["regressors"].keys(): - additive_regressors = self.future_regressors(inputs["regressors"]["additive"], "additive") - additive_components_nonstationary += additive_regressors - components["additive_regressors"] = additive_regressors - if "multiplicative" in inputs["regressors"].keys(): - multiplicative_regressors = self.future_regressors( - inputs["regressors"]["multiplicative"], "multiplicative" - ) - multiplicative_components_nonstationary += multiplicative_regressors - components["multiplicative_regressors"] = multiplicative_regressors - - # stationarized input - if "lags" in inputs: - # combinde all non-stationary components over AR input range - nonstationary_components = ( # dimensions - [batch, n_lags, median quantile] + # Unpack and process regressors + additive_regressors_input = None + multiplicative_regressors_input = None + if "additive_regressors" in components_stacker.feature_indices: + additive_regressors_input = components_stacker.unstack_component( + component_name="additive_regressors", batch_tensor=input_tensor + ) + additive_regressors = self.future_regressors(additive_regressors_input, "additive") + additive_components_nonstationary += additive_regressors + components["additive_regressors"] = additive_regressors + if "multiplicative_regressors" in components_stacker.feature_indices: + multiplicative_regressors_input = components_stacker.unstack_component( + component_name="multiplicative_regressors", batch_tensor=input_tensor + ) + multiplicative_regressors = self.future_regressors(multiplicative_regressors_input, "multiplicative") + multiplicative_components_nonstationary += multiplicative_regressors + components["multiplicative_regressors"] = multiplicative_regressors + + # Unpack and process lags + lags_input = None + if "lags" in components_stacker.feature_indices: + lags_input = components_stacker.unstack_component(component_name="lags", batch_tensor=input_tensor) + nonstationary_components = ( trend[:, : self.n_lags, 0] + additive_components_nonstationary[:, : self.n_lags, 0] + trend[:, : self.n_lags, 0].detach() * multiplicative_components_nonstationary[:, : self.n_lags, 0] ) - stationarized_lags = inputs["lags"] - nonstationary_components + stationarized_lags = lags_input - nonstationary_components lags = self.auto_regression(lags=stationarized_lags) additive_components += lags components["lags"] = lags - if "covariates" in inputs: - covariates = self.forward_covar_net(covariates=inputs["covariates"]) + # Unpack and process covariates + covariates_input = None + if self.config_lagged_regressors and self.config_lagged_regressors.regressors is not None: + covariates_input = components_stacker.unstack_component( + component_name="lagged_regressors", batch_tensor=input_tensor + ) + covariates = self.forward_covar_net(covariates=covariates_input) additive_components += covariates components["covariates"] = covariates - # combine all non-stationary components over forecast range + # Combine components and compute predictions predictions_nonstationary = ( - trend[:, self.n_lags : inputs["time"].shape[1], :] - + additive_components_nonstationary[:, self.n_lags : inputs["time"].shape[1], :] - + trend[:, self.n_lags : inputs["time"].shape[1], :].detach() - * multiplicative_components_nonstationary[:, self.n_lags : inputs["time"].shape[1], :] + trend[:, self.n_lags : time_input.shape[1], :] + + additive_components_nonstationary[:, self.n_lags : time_input.shape[1], :] + + trend[:, self.n_lags : time_input.shape[1], :].detach() + * multiplicative_components_nonstationary[:, self.n_lags : time_input.shape[1], :] ) - prediction = predictions_nonstationary + additive_components # dimensions - [batch, n_forecasts, no_quantiles] + prediction = predictions_nonstationary + additive_components - # check for crossing quantiles and correct them here - if "predict_mode" in inputs.keys() and inputs["predict_mode"]: - predict_mode = True - else: - predict_mode = False + # Correct crossing quantiles prediction_with_quantiles = self._compute_quantile_forecasts_from_diffs(prediction, predict_mode) - # component calculation + # Compute components if required if compute_components_flag: - components = self.compute_components(inputs, components, meta) + components = self.compute_components( + time_input, + seasonalities_input, + lags_input, + covariates_input, + additive_events_input, + multiplicative_events_input, + additive_regressors_input, + multiplicative_regressors_input, + components, + meta, + ) else: components = None return prediction_with_quantiles, components - def compute_components(self, inputs: Dict, components_raw: Dict, meta: Dict) -> Dict: - """This method returns the values of each model component. - Note - ---- - Time input is required. Minimum model setup is a linear trend. - Parameters - ---------- - inputs : dict - Model inputs, each of len(df) but with varying dimensions - Note - ---- - Contains the following data: - Model Inputs - * ``time`` (torch.Tensor , loat), normalized time, dims: (batch, n_forecasts) - * ``lags`` (torch.Tensor, float), dims: (batch, n_lags) - * ``seasonalities`` (torch.Tensor, float), dict of named seasonalities (keys) with their features - (values), dims of each dict value (batch, n_forecasts, n_features) - * ``covariates`` (torch.Tensor, float), dict of named covariates (keys) with their features - (values), dims of each dict value: (batch, n_lags) - * ``events`` (torch.Tensor, float), all event features, dims (batch, n_forecasts, n_features) - * ``regressors``(torch.Tensor, float), all regressor features, dims (batch, n_forecasts, n_features) - components_raw : dict - components to be computed - ------- - dict - Containing forecast coomponents with elements of dims (batch, n_forecasts) - """ + def compute_components( + self, + time_input, + seasonality_input, + lags_input, + covariates_input, + additive_events_input, + multiplicative_events_input, + additive_regressors_input, + multiplicative_regressors_input, + components_raw: Dict, + meta: Dict, + ) -> Dict: components = {} - components["trend"] = components_raw["trend"][:, self.n_lags : inputs["time"].shape[1], :] - if self.config_trend is not None and "seasonalities" in inputs: - for name, features in inputs["seasonalities"].items(): + components["trend"] = components_raw["trend"][:, self.n_lags : time_input.shape[1], :] + if self.config_trend is not None and seasonality_input is not None: + for name, features in seasonality_input.items(): components[f"season_{name}"] = self.seasonality.compute_fourier( - features=features[:, self.n_lags : inputs["time"].shape[1], :], name=name, meta=meta + features=features[:, self.n_lags : time_input.shape[1], :], name=name, meta=meta ) - if self.n_lags > 0 and "lags" in inputs: + if self.n_lags > 0 and lags_input is not None: components["ar"] = components_raw["lags"] - if ( - self.config_lagged_regressors is not None - and self.config_lagged_regressors.regressors is not None - and "covariates" in inputs - ): + if self.config_lagged_regressors is not None and covariates_input is not None: # Combined forward pass all_covariates = components_raw["covariates"] # Calculate the contribution of each covariate on each forecast @@ -694,7 +696,7 @@ def compute_components(self, inputs: Dict, components_raw: Dict, meta: Dict) -> covar_attribution_sum_per_forecast = reduce( torch.add, [torch.sum(covar, axis=1) for _, covar in covar_attributions.items()] ).to(all_covariates.device) - for name in inputs["covariates"].keys(): + for name in covariates_input.keys(): # Distribute the contribution of the current covariate to the combined forward pass # 1. Calculate the relative share of each covariate on the total attributions # 2. Multiply the relative share with the combined forward pass @@ -705,59 +707,62 @@ def compute_components(self, inputs: Dict, components_raw: Dict, meta: Dict) -> covar_attribution_sum_per_forecast, ).reshape(self.n_forecasts, len(self.quantiles)), ) - if (self.config_events is not None or self.config_holidays is not None) and "events" in inputs: - if "additive" in inputs["events"].keys(): + if self.config_events is not None or self.config_holidays is not None: + if additive_events_input is not None: components["events_additive"] = components_raw["additive_events"][ - :, self.n_lags : inputs["time"].shape[1], : + :, self.n_lags : time_input.shape[1], : ] - if "multiplicative" in inputs["events"].keys(): + if multiplicative_events_input is not None: components["events_multiplicative"] = components_raw["multiplicative_events"][ - :, self.n_lags : inputs["time"].shape[1], : + :, self.n_lags : time_input.shape[1], : ] for event, configs in self.events_dims.items(): mode = configs["mode"] indices = configs["event_indices"] if mode == "additive": - features = inputs["events"]["additive"][:, self.n_lags : inputs["time"].shape[1], :] + features = additive_events_input[:, self.n_lags : time_input.shape[1], :] params = self.event_params["additive"] else: - features = inputs["events"]["multiplicative"][:, self.n_lags : inputs["time"].shape[1], :] + features = multiplicative_events_input[:, self.n_lags : time_input.shape[1], :] params = self.event_params["multiplicative"] components[f"event_{event}"] = self.scalar_features_effects( features=features, params=params, indices=indices ) - if self.config_regressors.regressors is not None and "regressors" in inputs: - if "additive" in inputs["regressors"].keys(): + if self.config_regressors.regressors is not None: + if additive_regressors_input is not None: components["future_regressors_additive"] = components_raw["additive_regressors"][ - :, self.n_lags : inputs["time"].shape[1], : + :, self.n_lags : time_input.shape[1], : ] - if "multiplicative" in inputs["regressors"].keys(): + if multiplicative_regressors_input is not None: components["future_regressors_multiplicative"] = components_raw["multiplicative_regressors"][ - :, self.n_lags : inputs["time"].shape[1], : + :, self.n_lags : time_input.shape[1], : ] for regressor, configs in self.future_regressors.regressors_dims.items(): mode = configs["mode"] index = [] index.append(configs["regressor_index"]) - features = inputs["regressors"][mode] - components[f"future_regressor_{regressor}"] = self.future_regressors( - features[:, self.n_lags : inputs["time"].shape[1], :], mode, indeces=index - ) - + if mode == "additive" and additive_regressors_input is not None: + components[f"future_regressor_{regressor}"] = self.future_regressors( + additive_regressors_input[:, self.n_lags : time_input.shape[1], :], mode, indeces=index + ) + if mode == "multiplicative" and multiplicative_regressors_input is not None: + components[f"future_regressor_{regressor}"] = self.future_regressors( + multiplicative_regressors_input[:, self.n_lags : time_input.shape[1], :], mode, indeces=index + ) return components def set_compute_components(self, compute_components_flag): self.compute_components_flag = compute_components_flag - def loss_func(self, inputs, predicted, targets): + def loss_func(self, time, predicted, targets): loss = None # Compute loss. no reduction. loss = self.config_train.loss_func(predicted, targets) if self.config_train.newer_samples_weight > 1.0: # Weigh newer samples more. - loss = loss * self._get_time_based_sample_weight(t=inputs["time"][:, self.n_lags :]) + loss = loss * self._get_time_based_sample_weight(t=time[:, self.n_lags :]) loss = loss.sum(dim=2).mean() # Regularize. if self.reg_enabled and not self.finding_lr: @@ -767,20 +772,24 @@ def loss_func(self, inputs, predicted, targets): return loss, reg_loss def training_step(self, batch, batch_idx): - inputs, targets, meta = batch + inputs_tensor, meta = batch + epoch_float = self.trainer.current_epoch + batch_idx / float(self.train_steps_per_epoch) self.train_progress = epoch_float / float(self.config_train.epochs) + + targets = self.train_components_stacker.unstack_component("targets", batch_tensor=inputs_tensor) + time = self.train_components_stacker.unstack_component("time", batch_tensor=inputs_tensor) # Global-local if self.meta_used_in_model: meta_name_tensor = torch.tensor([self.id_dict[i] for i in meta["df_name"]], device=self.device) else: meta_name_tensor = None # Run forward calculation - predicted, _ = self.forward(inputs, meta_name_tensor) + predicted, _ = self.forward(inputs_tensor, self.train_components_stacker, meta_name_tensor) # Store predictions in self for later network visualization self.train_epoch_prediction = predicted # Calculate loss - loss, reg_loss = self.loss_func(inputs, predicted, targets) + loss, reg_loss = self.loss_func(time, predicted, targets) # Optimization optimizer = self.optimizers() @@ -803,6 +812,7 @@ def training_step(self, batch, batch_idx): if self.metrics_enabled and not self.finding_lr: predicted_denorm = self.denormalize(predicted[:, :, 0]) target_denorm = self.denormalize(targets.squeeze(dim=2)) + target_denorm = target_denorm.contiguous() self.log_dict(self.metrics_train(predicted_denorm, target_denorm), **self.log_args) self.log("Loss", loss, **self.log_args) self.log("RegLoss", reg_loss, **self.log_args) @@ -811,54 +821,66 @@ def training_step(self, batch, batch_idx): return loss def validation_step(self, batch, batch_idx): - inputs, targets, meta = batch + inputs_tensor, meta = batch + targets = self.val_components_stacker.unstack_component("targets", batch_tensor=inputs_tensor) + time = self.val_components_stacker.unstack_component("time", batch_tensor=inputs_tensor) # Global-local if self.meta_used_in_model: meta_name_tensor = torch.tensor([self.id_dict[i] for i in meta["df_name"]], device=self.device) else: meta_name_tensor = None # Run forward calculation - predicted, _ = self.forward(inputs, meta_name_tensor) + predicted, _ = self.forward(inputs_tensor, self.val_components_stacker, meta_name_tensor) # Calculate loss - loss, reg_loss = self.loss_func(inputs, predicted, targets) + loss, reg_loss = self.loss_func(time, predicted, targets) # Metrics if self.metrics_enabled: predicted_denorm = self.denormalize(predicted[:, :, 0]) target_denorm = self.denormalize(targets.squeeze(dim=2)) + target_denorm = target_denorm.contiguous() self.log_dict(self.metrics_val(predicted_denorm, target_denorm), **self.log_args) self.log("Loss_val", loss, **self.log_args) self.log("RegLoss_val", reg_loss, **self.log_args) def test_step(self, batch, batch_idx): - inputs, targets, meta = batch + inputs_tensor, meta = batch + targets = self.test_components_stacker.unstack_component("targets", batch_tensor=inputs_tensor) + time = self.test_components_stacker.unstack_component("time", batch_tensor=inputs_tensor) # Global-local if self.meta_used_in_model: meta_name_tensor = torch.tensor([self.id_dict[i] for i in meta["df_name"]], device=self.device) else: meta_name_tensor = None # Run forward calculation - predicted, _ = self.forward(inputs, meta_name_tensor) + predicted, _ = self.forward(inputs_tensor, self.test_components_stacker, meta_name_tensor) # Calculate loss - loss, reg_loss = self.loss_func(inputs, predicted, targets) + loss, reg_loss = self.loss_func(time, predicted, targets) # Metrics if self.metrics_enabled: predicted_denorm = self.denormalize(predicted[:, :, 0]) target_denorm = self.denormalize(targets.squeeze(dim=2)) + # target_denorm = target_denorm.detach().clone() + target_denorm = target_denorm.contiguous() self.log_dict(self.metrics_val(predicted_denorm, target_denorm), **self.log_args) self.log("Loss_test", loss, **self.log_args) self.log("RegLoss_test", reg_loss, **self.log_args) def predict_step(self, batch, batch_idx, dataloader_idx=0): - inputs, _, meta = batch + inputs_tensor, meta = batch # Global-local if self.meta_used_in_model: meta_name_tensor = torch.tensor([self.id_dict[i] for i in meta["df_name"]], device=self.device) else: meta_name_tensor = None - # Add predict_mode flag to dataset - inputs["predict_mode"] = True + # Run forward calculation - prediction, components = self.forward(inputs, meta_name_tensor, self.compute_components_flag) + prediction, components = self.forward( + inputs_tensor, + self.predict_components_stacker, + meta_name_tensor, + self.compute_components_flag, + predict_mode=True, + ) return prediction, components def configure_optimizers(self): diff --git a/neuralprophet/utils_time_dataset.py b/neuralprophet/utils_time_dataset.py new file mode 100644 index 000000000..075dff8e1 --- /dev/null +++ b/neuralprophet/utils_time_dataset.py @@ -0,0 +1,306 @@ +from collections import OrderedDict + +import torch + + +class ComponentStacker: + def __init__( + self, + n_lags, + n_forecasts, + max_lags, + feature_indices={}, + config_seasonality=None, + lagged_regressor_config=None, + ): + """ + Initializes the ComponentStacker with the necessary parameters. + + Args: + n_lags (int): Number of lags used in the model. + n_forecasts (int): Number of forecasts to be made. + max_lags (int): Maximum number of lags used in the model. + feature_indices (dict): A dictionary containing the start and end indices of different features in the tensor. + config_seasonality (object, optional): Configuration object that defines the seasonality periods. + lagged_regressor_config (dict, optional): Configuration dictionary that defines the lagged regressors and their properties. + """ + self.n_lags = n_lags + self.n_forecasts = n_forecasts + self.max_lags = max_lags + self.feature_indices = feature_indices + self.config_seasonality = config_seasonality + self.lagged_regressor_config = lagged_regressor_config + + def unstack_component(self, component_name, batch_tensor): + """ + Routes the unstackion process to the appropriate function based on the component name. + + Args: + component_name (str): The name of the component to unstack. + + Returns: + Various: The output of the specific unstackion function. + """ + if component_name == "targets": + return self.unstack_targets(batch_tensor) + elif component_name == "time": + return self.unstack_time(batch_tensor) + elif component_name == "seasonalities": + return self.unstack_seasonalities(batch_tensor) + elif component_name == "lagged_regressors": + return self.unstack_lagged_regressors(batch_tensor) + elif component_name == "lags": + return self.unstack_lags(batch_tensor) + elif component_name == "additive_events": + return self.unstack_additive_events(batch_tensor) + elif component_name == "multiplicative_events": + return self.unstack_multiplicative_events(batch_tensor) + elif component_name == "additive_regressors": + return self.unstack_additive_regressors(batch_tensor) + elif component_name == "multiplicative_regressors": + return self.unstack_multiplicative_regressors(batch_tensor) + else: + raise ValueError(f"Unknown component name: {component_name}") + + def unstack_targets(self, batch_tensor): + targets_start_idx, targets_end_idx = self.feature_indices["targets"] + if self.max_lags > 0: + return batch_tensor[:, self.max_lags : self.max_lags + self.n_forecasts, targets_start_idx].unsqueeze(2) + else: + return batch_tensor[:, targets_start_idx : targets_end_idx + 1].unsqueeze(1) + + def unstack_time(self, batch_tensor): + start_idx, end_idx = self.feature_indices["time"] + if self.max_lags > 0: + return batch_tensor[:, self.max_lags - self.n_lags : self.max_lags + self.n_forecasts, start_idx] + else: + return batch_tensor[:, start_idx : end_idx + 1] + + def unstack_lags(self, batch_tensor): + lags_start_idx, _ = self.feature_indices["lags"] + return batch_tensor[:, self.max_lags - self.n_lags : self.max_lags, lags_start_idx] + + def unstack_lagged_regressors(self, batch_tensor): + lagged_regressors = OrderedDict() + if self.lagged_regressor_config is not None and self.lagged_regressor_config.regressors is not None: + for name, lagged_regressor in self.lagged_regressor_config.regressors.items(): + lagged_regressor_key = f"lagged_regressor_{name}" + if lagged_regressor_key in self.feature_indices: + lagged_regressor_start_idx, _ = self.feature_indices[lagged_regressor_key] + covar_lags = lagged_regressor.n_lags + lagged_regressor_offset = self.max_lags - covar_lags + lagged_regressors[name] = batch_tensor[ + :, + lagged_regressor_offset : lagged_regressor_offset + covar_lags, + lagged_regressor_start_idx, + ] + return lagged_regressors + + def unstack_seasonalities(self, batch_tensor): + seasonalities = OrderedDict() + if self.max_lags > 0: + for seasonality_name in self.config_seasonality.periods.keys(): + seasonality_key = f"seasonality_{seasonality_name}" + if seasonality_key in self.feature_indices: + seasonality_start_idx, seasonality_end_idx = self.feature_indices[seasonality_key] + seasonalities[seasonality_name] = batch_tensor[ + :, + self.max_lags - self.n_lags : self.max_lags + self.n_forecasts, + seasonality_start_idx:seasonality_end_idx, + ] + else: + for seasonality_name in self.config_seasonality.periods.keys(): + seasonality_key = f"seasonality_{seasonality_name}" + if seasonality_key in self.feature_indices: + seasonality_start_idx, seasonality_end_idx = self.feature_indices[seasonality_key] + seasonalities[seasonality_name] = batch_tensor[ + :, seasonality_start_idx:seasonality_end_idx + ].unsqueeze(1) + + return seasonalities + + def unstack_additive_events(self, batch_tensor): + if self.max_lags > 0: + events_start_idx, events_end_idx = self.feature_indices["additive_events"] + future_offset = self.max_lags - self.n_lags + return batch_tensor[ + :, future_offset : future_offset + self.n_forecasts + self.n_lags, events_start_idx : events_end_idx + 1 + ] + else: + events_start_idx, events_end_idx = self.feature_indices["additive_events"] + return batch_tensor[:, events_start_idx : events_end_idx + 1].unsqueeze(1) + + def unstack_multiplicative_events(self, batch_tensor): + if self.max_lags > 0: + events_start_idx, events_end_idx = self.feature_indices["multiplicative_events"] + return batch_tensor[ + :, self.max_lags - self.n_lags : self.max_lags + self.n_forecasts, events_start_idx : events_end_idx + 1 + ] + else: + events_start_idx, events_end_idx = self.feature_indices["multiplicative_events"] + return batch_tensor[:, events_start_idx : events_end_idx + 1].unsqueeze(1) + + def unstack_additive_regressors(self, batch_tensor): + if self.max_lags > 0: + regressors_start_idx, regressors_end_idx = self.feature_indices["additive_regressors"] + return batch_tensor[ + :, + self.max_lags - self.n_lags : self.max_lags + self.n_forecasts, + regressors_start_idx : regressors_end_idx + 1, + ] + else: + regressors_start_idx, regressors_end_idx = self.feature_indices["additive_regressors"] + return batch_tensor[:, regressors_start_idx : regressors_end_idx + 1].unsqueeze(1) + + def unstack_multiplicative_regressors(self, batch_tensor): + if self.max_lags > 0: + regressors_start_idx, regressors_end_idx = self.feature_indices["multiplicative_regressors"] + future_offset = self.max_lags - self.n_lags + return batch_tensor[ + :, + future_offset : future_offset + self.n_forecasts + self.n_lags, + regressors_start_idx : regressors_end_idx + 1, + ] + else: + regressors_start_idx, regressors_end_idx = self.feature_indices["multiplicative_regressors"] + return batch_tensor[:, regressors_start_idx : regressors_end_idx + 1].unsqueeze(1) + + def stack_trend_component(self, df_tensors, feature_list, current_idx): + """ + Stack the trend (time) feature. + """ + time_tensor = df_tensors["t"].unsqueeze(-1) # Shape: [T, 1] + feature_list.append(time_tensor) + self.feature_indices["time"] = (current_idx, current_idx) + return current_idx + 1 + + def stack_lags_component(self, df_tensors, feature_list, current_idx, n_lags): + """ + Stack the lags feature. + """ + if n_lags >= 1 and "y_scaled" in df_tensors: + lags_tensor = df_tensors["y_scaled"].unsqueeze(-1) + feature_list.append(lags_tensor) + self.feature_indices["lags"] = (current_idx, current_idx) + return current_idx + 1 + return current_idx + + def stack_targets_component(self, df_tensors, feature_list, current_idx): + """ + Stack the targets feature. + """ + if "y_scaled" in df_tensors: + targets_tensor = df_tensors["y_scaled"].unsqueeze(-1) + feature_list.append(targets_tensor) + self.feature_indices["targets"] = (current_idx, current_idx) + return current_idx + 1 + return current_idx + + def stack_lagged_regerssors_component(self, df_tensors, feature_list, current_idx, config_lagged_regressors): + """ + Stack the lagged regressor features. + """ + if config_lagged_regressors is not None and config_lagged_regressors.regressors is not None: + lagged_regressor_tensors = [ + df_tensors[name].unsqueeze(-1) for name in config_lagged_regressors.regressors.keys() + ] + stacked_lagged_regressor_tensor = torch.cat(lagged_regressor_tensors, dim=-1) + feature_list.append(stacked_lagged_regressor_tensor) + num_features = stacked_lagged_regressor_tensor.size(-1) + for i, name in enumerate(config_lagged_regressors.regressors.keys()): + self.feature_indices[f"lagged_regressor_{name}"] = ( + current_idx + i, + current_idx + i + 1, + ) + return current_idx + num_features + return current_idx + + def stack_additive_events_component( + self, + df_tensors, + feature_list, + current_idx, + additive_event_and_holiday_names, + ): + """ + Stack the additive event and holiday features. + """ + if additive_event_and_holiday_names: + additive_events_tensor = torch.cat( + [df_tensors[name].unsqueeze(-1) for name in additive_event_and_holiday_names], + dim=1, + ) + feature_list.append(additive_events_tensor) + self.feature_indices["additive_events"] = ( + current_idx, + current_idx + additive_events_tensor.size(1) - 1, + ) + return current_idx + additive_events_tensor.size(1) + return current_idx + + def stack_multiplicative_events_component( + self, df_tensors, feature_list, current_idx, multiplicative_event_and_holiday_names + ): + """ + Stack the multiplicative event and holiday features. + """ + if multiplicative_event_and_holiday_names: + multiplicative_events_tensor = torch.cat( + [df_tensors[name].unsqueeze(-1) for name in multiplicative_event_and_holiday_names], dim=1 + ) + feature_list.append(multiplicative_events_tensor) + self.feature_indices["multiplicative_events"] = ( + current_idx, + current_idx + multiplicative_events_tensor.size(1) - 1, + ) + return current_idx + multiplicative_events_tensor.size(1) + return current_idx + + def stack_additive_regressors_component(self, df_tensors, feature_list, current_idx, additive_regressors_names): + """ + Stack the additive regressor features. + """ + if additive_regressors_names: + additive_regressors_tensor = torch.cat( + [df_tensors[name].unsqueeze(-1) for name in additive_regressors_names], dim=1 + ) + feature_list.append(additive_regressors_tensor) + self.feature_indices["additive_regressors"] = ( + current_idx, + current_idx + additive_regressors_tensor.size(1) - 1, + ) + return current_idx + additive_regressors_tensor.size(1) + return current_idx + + def stack_multiplicative_regressors_component( + self, df_tensors, feature_list, current_idx, multiplicative_regressors_names + ): + """ + Stack the multiplicative regressor features. + """ + if multiplicative_regressors_names: + multiplicative_regressors_tensor = torch.cat( + [df_tensors[name].unsqueeze(-1) for name in multiplicative_regressors_names], dim=1 + ) # Shape: [batch_size, num_multiplicative_regressors, 1] + feature_list.append(multiplicative_regressors_tensor) + self.feature_indices["multiplicative_regressors"] = ( + current_idx, + current_idx + len(multiplicative_regressors_names) - 1, + ) + return current_idx + len(multiplicative_regressors_names) + return current_idx + + def stack_seasonalities_component(self, feature_list, current_idx, config_seasonality, seasonalities): + """ + Stack the seasonality features. + """ + if config_seasonality and config_seasonality.periods: + for seasonality_name, features in seasonalities.items(): + seasonal_tensor = features + feature_list.append(seasonal_tensor) + self.feature_indices[f"seasonality_{seasonality_name}"] = ( + current_idx, + current_idx + seasonal_tensor.size(1), + ) + current_idx += seasonal_tensor.size(1) + return current_idx diff --git a/tests/test_future_regressor_nn.py b/tests/test_future_regressor_nn.py index cb106d443..e394bdccb 100644 --- a/tests/test_future_regressor_nn.py +++ b/tests/test_future_regressor_nn.py @@ -180,7 +180,9 @@ def test_future_regressor_nn_shared_2(): # log.info("future regressor with NN shared coef 2") # df = pd.read_csv(ENERGY_TEMP_DAILY_FILE, nrows=NROWS) # m = NeuralProphet( -# epochs=EPOCHS, batch_size=BATCH_SIZE, learning_rate=LR, +# epochs=EPOCHS, +# batch_size=BATCH_SIZE, +# learning_rate=LR, # yearly_seasonality=False, # weekly_seasonality=False, # daily_seasonality=True, diff --git a/tests/test_train_config.py b/tests/test_train_config.py index 6867a9bb7..7ceba6cc0 100644 --- a/tests/test_train_config.py +++ b/tests/test_train_config.py @@ -50,7 +50,9 @@ def test_custom_lr_scheduler(): scheduler="CosineAnnealingWarmRestarts", scheduler_args={"T_0": 5, "T_mult": 2}, ) - _ = m.fit(df, freq="D") + metrics = m.fit(df, freq="D") + print(f"metrics = {metrics}") + # Set in NeuralProphet(), no args m = NeuralProphet( epochs=EPOCHS, @@ -58,7 +60,8 @@ def test_custom_lr_scheduler(): learning_rate=LR, scheduler="StepLR", ) - _ = m.fit(df, freq="D") + metrics = m.fit(df, freq="D") + print(f"metrics = {metrics}") # Set in fit() m = NeuralProphet(epochs=EPOCHS, batch_size=BATCH_SIZE, learning_rate=LR) @@ -68,7 +71,7 @@ def test_custom_lr_scheduler(): scheduler="ExponentialLR", scheduler_args={"gamma": 0.95}, ) - + print(f"metrics = {metrics}") # Set in fit(), no args m = NeuralProphet(epochs=EPOCHS, batch_size=BATCH_SIZE, learning_rate=LR) _ = m.fit( @@ -76,3 +79,4 @@ def test_custom_lr_scheduler(): freq="D", scheduler="OneCycleLR", ) + print(f"metrics = {metrics}") diff --git a/tests/test_unit.py b/tests/test_unit.py index 42f19d218..6c234ae57 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -10,7 +10,7 @@ import pytest from torch.utils.data import DataLoader -from neuralprophet import NeuralProphet, configure, df_utils, time_dataset +from neuralprophet import NeuralProphet, configure, df_utils, time_dataset, utils_time_dataset from neuralprophet.data.process import _create_dataset, _handle_missing_data from neuralprophet.data.transform import _normalize @@ -97,6 +97,14 @@ def test_timedataset_minimal(): df = df_utils.normalize(df, global_data_params) df["ID"] = "__df__" + components_stacker = utils_time_dataset.ComponentStacker( + n_lags=n_lags, + n_forecasts=n_forecasts, + max_lags=n_lags, + config_seasonality=None, + lagged_regressor_config=None, + ) + dataset = time_dataset.TimeDataset( df=df, predict_mode=False, @@ -110,16 +118,18 @@ def test_timedataset_minimal(): config_regressors=None, config_lagged_regressors=None, config_missing=config_missing, + config_model=None, + components_stacker=components_stacker, ) - inputs, targets, meta = dataset.__getitem__(0) - # inputs50, targets50, meta50 = dataset.__getitem__(50) - log.debug(f"(n_forecasts {n_forecasts}, n_lags {n_lags})") - log.debug(f"tabularized targets: {targets.shape}") - log.debug( - "tabularized inputs: {}".format( - "; ".join(["{}: {}".format(inp, values.shape) for inp, values in inputs.items()]) - ) - ) + input, meta = dataset.__getitem__(0) + # # inputs50, targets50, meta50 = dataset.__getitem__(50) + # log.debug(f"(n_forecasts {n_forecasts}, n_lags {n_lags})") + # log.debug(f"tabularized targets: {targets.shape}") + # log.debug( + # "tabularized inputs: {}".format( + # "; ".join(["{}: {}".format(inp, values.shape) for inp, values in inputs.items()]) + # ) + # ) def test_normalize(): @@ -683,8 +693,15 @@ def test_globaltimedataset(): ) m.config_normalization = config_normalization df_global = _normalize(df=df_global, config_normalization=m.config_normalization) - _create_dataset(m, df_global, predict_mode=False) - _create_dataset(m, df_global, predict_mode=True) + components_stacker = utils_time_dataset.ComponentStacker( + n_lags=m.n_lags, + n_forecasts=m.n_forecasts, + max_lags=m.max_lags, + config_seasonality=m.config_seasonality, + lagged_regressor_config=m.config_lagged_regressors, + ) + _create_dataset(m, df_global, predict_mode=False, components_stacker=components_stacker) + _create_dataset(m, df_global, predict_mode=True, components_stacker=components_stacker) # lagged_regressors, future_regressors df4 = df.copy() @@ -706,8 +723,15 @@ def test_globaltimedataset(): config_normalization.init_data_params(df4, m.config_lagged_regressors, m.config_regressors, m.config_events) m.config_normalization = config_normalization df4 = _normalize(df=df4, config_normalization=m.config_normalization) - _create_dataset(m, df4, predict_mode=False) - _create_dataset(m, df4, predict_mode=True) + components_stacker = utils_time_dataset.ComponentStacker( + n_lags=m.n_lags, + n_forecasts=m.n_forecasts, + max_lags=m.max_lags, + config_seasonality=m.config_seasonality, + lagged_regressor_config=m.config_lagged_regressors, + ) + _create_dataset(m, df4, predict_mode=False, components_stacker=components_stacker) + _create_dataset(m, df4, predict_mode=True, components_stacker=components_stacker) def test_dataloader(): @@ -736,9 +760,16 @@ def test_dataloader(): config_normalization.init_data_params(df_global, m.config_lagged_regressors, m.config_regressors, m.config_events) m.config_normalization = config_normalization df_global = _normalize(df=df_global, config_normalization=m.config_normalization) - dataset = _create_dataset(m, df_global, predict_mode=False) + components_stacker = utils_time_dataset.ComponentStacker( + n_lags=3, + n_forecasts=2, + max_lags=3, + config_seasonality=None, + lagged_regressor_config=None, + ) + dataset = _create_dataset(m, df_global, predict_mode=False, components_stacker=components_stacker) loader = DataLoader(dataset, batch_size=min(1024, len(df)), shuffle=True, drop_last=False) - for inputs, targets, meta in loader: + for _, meta in loader: assert set(meta["df_name"]) == set(df_global["ID"].unique()) break @@ -864,6 +895,13 @@ def test_too_many_NaN(): df["ID"] = "__df__" # Check if ValueError is thrown, if NaN values remain after auto-imputing with pytest.raises(ValueError): + components_stacker = utils_time_dataset.ComponentStacker( + n_lags=n_lags, + n_forecasts=n_forecasts, + max_lags=n_lags, + config_seasonality=None, + lagged_regressor_config=None, + ) time_dataset.TimeDataset( df=df, predict_mode=False, @@ -877,6 +915,8 @@ def test_too_many_NaN(): config_regressors=None, config_lagged_regressors=None, config_missing=config_missing, + config_model=None, + components_stacker=components_stacker, )