Skip to content

Commit

Permalink
[Major] Speedup dataset get_item (#1636)
Browse files Browse the repository at this point in the history
* updated dataset get_item

* fixed linting issues

* make targets contiguous

* fixed ruff warnings

* Unpack incrementally when needed

* adjust forecaster

* separate unpacking logic

* added featureExtractor class

* separate packing logic

* fixed liniting issues

* fixed covariates

* added features extractor

* rename classes and functions

* remove prints in time_net

* init Stacker in forecaster

* fix time-dataset

* remove last _create component staker

* fix lagged config

* uncomment glocal tests

* fix ruff

* uncomment future reg tests

* comment test

---------

Co-authored-by: ourownstory <[email protected]>
  • Loading branch information
MaiBe-ctrl and ourownstory authored Sep 3, 2024
1 parent ae560c1 commit 9c4963c
Show file tree
Hide file tree
Showing 13 changed files with 718 additions and 480 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@
"try:\n",
" # it already installed dependencies\n",
" from torchviz import make_dot\n",
"except:\n",
"except ImportError:\n",
" # install graphviz on system\n",
" import platform\n",
"\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,7 @@
"try:\n",
" # it already installed dependencies\n",
" from torchviz import make_dot\n",
"except:\n",
"except ImportError:\n",
" # install graphviz on system\n",
" import platform\n",
"\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
" # it already installed dependencies\n",
" from torchsummary import summary\n",
" from torchviz import make_dot\n",
"except:\n",
"except ImportError:\n",
" # install graphviz on system\n",
" import platform\n",
"\n",
Expand Down Expand Up @@ -69,7 +69,7 @@
"source": [
"try:\n",
" from neuralprophet import NeuralProphet\n",
"except:\n",
"except ImportError:\n",
" # if NeuralProphet is not installed yet:\n",
" !pip install git+https://github.com/ourownstory/neural_prophet.git\n",
" from neuralprophet import NeuralProphet"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,12 @@
"# Set loggers to ERROR level\n",
"import logging\n",
"import warnings\n",
"from neuralprophet import set_log_level\n",
"\n",
"\n",
"logging.getLogger(\"prophet\").setLevel(logging.ERROR)\n",
"warnings.filterwarnings(\"ignore\")\n",
"\n",
"from neuralprophet import set_log_level\n",
"\n",
"set_log_level(\"ERROR\")"
]
Expand Down
5 changes: 5 additions & 0 deletions neuralprophet/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

@dataclass
class Model:
features_map: dict
lagged_reg_layers: Optional[List[int]]
quantiles: Optional[List[float]] = None

def setup_quantiles(self):
Expand All @@ -42,6 +44,9 @@ def setup_quantiles(self):
self.quantiles.insert(0, 0.5)


ConfigModel = Model


@dataclass
class Normalization:
normalize: str
Expand Down
4 changes: 3 additions & 1 deletion neuralprophet/data/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ def _handle_missing_data(
return df


def _create_dataset(model, df, predict_mode, prediction_frequency=None):
def _create_dataset(model, df, predict_mode, prediction_frequency=None, components_stacker=None):
"""Construct dataset from dataframe.
(Configured Hyperparameters can be overridden by explicitly supplying them.
Expand Down Expand Up @@ -626,5 +626,7 @@ def _create_dataset(model, df, predict_mode, prediction_frequency=None):
config_regressors=model.config_regressors,
config_lagged_regressors=model.config_lagged_regressors,
config_missing=model.config_missing,
config_model=model.config_model,
components_stacker=components_stacker,
# config_train=model.config_train, # no longer needed since JIT tabularization.
)
102 changes: 89 additions & 13 deletions neuralprophet/forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@
from matplotlib.axes import Axes
from torch.utils.data import DataLoader

from neuralprophet import configure, df_utils, np_types, time_dataset, time_net, utils, utils_lightning, utils_metrics
from neuralprophet import (
configure,
df_utils,
np_types,
time_dataset,
time_net,
utils,
utils_lightning,
utils_metrics,
utils_time_dataset,
)
from neuralprophet.data.process import (
_check_dataframe,
_convert_raw_predictions_to_raw_df,
Expand All @@ -34,6 +44,7 @@
from neuralprophet.plot_model_parameters_plotly import plot_parameters as plot_parameters_plotly
from neuralprophet.plot_utils import get_valid_configuration, log_warning_deprecation_plotly, select_plotting_backend
from neuralprophet.uncertainty import Conformal
from neuralprophet.utils_time_dataset import ComponentStacker

log = logging.getLogger("NP.forecaster")

Expand Down Expand Up @@ -503,6 +514,8 @@ def __init__(

# Model
self.config_model = configure.Model(
features_map={},
lagged_reg_layers=lagged_reg_layers,
quantiles=quantiles,
)
self.config_model.setup_quantiles()
Expand Down Expand Up @@ -1067,7 +1080,7 @@ def fit(
or any(value != 1 for value in self.num_seasonalities_modelled_dict.values())
)

##### Data Setup, and Training Setup #####
# Data Setup, and Training Setup
# Train Configuration: overwrite self.config_train with user provided values
if learning_rate is not None:
self.config_train.learning_rate = learning_rate
Expand Down Expand Up @@ -1152,7 +1165,22 @@ def fit(
# Set up DataLoaders: Train
# Create TimeDataset
# Note: _create_dataset() needs to be called after set_auto_seasonalities()
dataset = _create_dataset(self, df, predict_mode=False, prediction_frequency=self.prediction_frequency)
train_components_stacker = utils_time_dataset.ComponentStacker(
n_lags=self.n_lags,
n_forecasts=self.n_forecasts,
max_lags=self.max_lags,
config_seasonality=self.config_seasonality,
lagged_regressor_config=self.config_lagged_regressors,
feature_indices={},
)

dataset = _create_dataset(
self,
df,
predict_mode=False,
prediction_frequency=self.prediction_frequency,
components_stacker=train_components_stacker,
)
# Determine the max_number of epochs
self.config_train.set_auto_batch_epoch(n_data=len(dataset))
# Create Train DataLoader
Expand All @@ -1162,6 +1190,7 @@ def fit(
shuffle=True,
num_workers=num_workers,
)

self.config_train.set_batches_per_epoch(len(loader))
log.info(f"Train Dataset size: {len(dataset)}")
log.info(f"Number of batches per training epoch: {len(loader)}")
Expand All @@ -1186,7 +1215,15 @@ def fit(
)
# df_val, _, _, _ = df_utils.prep_or_copy_df(df_val)
df_val = _normalize(df=df_val, config_normalization=self.config_normalization)
dataset_val = _create_dataset(self, df_val, predict_mode=False)
val_components_stacker = utils_time_dataset.ComponentStacker(
n_lags=self.n_lags,
max_lags=self.max_lags,
n_forecasts=self.n_forecasts,
config_seasonality=self.config_seasonality,
lagged_regressor_config=self.config_lagged_regressors,
feature_indices={},
)
dataset_val = _create_dataset(self, df_val, predict_mode=False, components_stacker=val_components_stacker)
loader_val = DataLoader(dataset_val, batch_size=min(1024, len(dataset_val)), shuffle=False, drop_last=False)

# Init the Trainer
Expand All @@ -1206,12 +1243,16 @@ def fit(
if not self.fitted:
self.model = self._init_model()

self.model.set_components_stacker(components_stacker=train_components_stacker, mode="train")
if validation_enabled:
self.model.set_components_stacker(components_stacker=val_components_stacker, mode="val")

# Find suitable learning rate if not set
if self.config_train.learning_rate is None:
assert not self.fitted, "Learning rate must be provided for re-training a fitted model."

## Init a separate Model, Loader and Trainer copy for LR finder (optional, done for safety)
## Note Leads to a CUDA issue. Needs to be fixed before enabling this feature.
# Init a separate Model, Loader and Trainer copy for LR finder (optional, done for safety)
# Note Leads to a CUDA issue. Needs to be fixed before enabling this feature.
# model_lr_finder = self._init_model()
# loader_lr_finder = DataLoader(
# dataset,
Expand Down Expand Up @@ -1414,7 +1455,16 @@ def test(self, df: pd.DataFrame, verbose: bool = True):
)
df, _, _, _ = df_utils.prep_or_copy_df(df)
df = _normalize(df=df, config_normalization=self.config_normalization)
dataset = _create_dataset(self, df, predict_mode=False)
components_stacker = utils_time_dataset.ComponentStacker(
n_lags=self.n_lags,
n_forecasts=self.n_forecasts,
max_lags=self.max_lags,
config_seasonality=self.config_seasonality,
lagged_regressor_config=self.config_lagged_regressors,
feature_indices={},
)
dataset = _create_dataset(self, df, predict_mode=False, components_stacker=components_stacker)
self.model.set_components_stacker(components_stacker, mode="test")
test_loader = DataLoader(dataset, batch_size=min(1024, len(dataset)), shuffle=False, drop_last=False)
# Use Lightning to calculate metrics
val_metrics = self.trainer.test(self.model, dataloaders=test_loader, verbose=verbose)
Expand Down Expand Up @@ -2047,6 +2097,13 @@ def predict_seasonal_components(self, df: pd.DataFrame, quantile: float = 0.5):
df = _normalize(df=df, config_normalization=self.config_normalization)
df_seasonal = pd.DataFrame()
for df_name, df_i in df.groupby("ID"):
feature_unstackor = ComponentStacker(
n_lags=0,
max_lags=0,
n_forecasts=1,
config_seasonality=self.config_seasonality,
lagged_regressor_config=self.config_lagged_regressors,
)
dataset = time_dataset.TimeDataset(
df=df_i,
predict_mode=True,
Expand All @@ -2060,25 +2117,29 @@ def predict_seasonal_components(self, df: pd.DataFrame, quantile: float = 0.5):
config_regressors=self.config_regressors,
config_lagged_regressors=self.config_lagged_regressors,
config_missing=self.config_missing,
config_model=self.config_model,
components_stacker=feature_unstackor,
# config_train=self.config_train, # no longer needed since JIT tabularization.
)
self.model.set_components_stacker(feature_unstackor, mode="predict")
loader = DataLoader(dataset, batch_size=min(4096, len(df)), shuffle=False, drop_last=False)
predicted = {}
for name in self.config_seasonality.periods:
predicted[name] = list()
for inputs, _, meta in loader:
for inputs_tensor, meta in loader:
# Meta as a tensor for prediction
if self.model.config_seasonality is None:
meta_name_tensor = None
elif self.model.config_seasonality.global_local in ["local", "glocal"]:
meta = OrderedDict()
meta["df_name"] = [df_name for _ in range(inputs["time"].shape[0])]
time_input = feature_unstackor.unstack_component("time", inputs_tensor)
meta["df_name"] = [df_name for _ in range(time_input.shape[0])]
meta_name_tensor = torch.tensor([self.model.id_dict[i] for i in meta["df_name"]]) # type: ignore
else:
meta_name_tensor = None

seasonalities_input = feature_unstackor.unstack_component("seasonalities", inputs_tensor)
for name in self.config_seasonality.periods:
features = inputs["seasonalities"][name]
features = seasonalities_input[name]
quantile_index = self.config_model.quantiles.index(quantile)
y_season = torch.squeeze(
self.model.seasonality.compute_fourier(features=features, name=name, meta=meta_name_tensor)[
Expand Down Expand Up @@ -2880,7 +2941,22 @@ def _predict_raw(self, df, df_name, include_components=False, prediction_frequen
assert len(df["ID"].unique()) == 1
if "y_scaled" not in df.columns or "t" not in df.columns:
raise ValueError("Received unprepared dataframe to predict. " "Please call predict_dataframe_to_predict.")
dataset = _create_dataset(self, df, predict_mode=True, prediction_frequency=prediction_frequency)
components_stacker = utils_time_dataset.ComponentStacker(
n_lags=self.n_lags,
n_forecasts=self.n_forecasts,
max_lags=self.max_lags,
config_seasonality=self.config_seasonality,
lagged_regressor_config=self.config_lagged_regressors,
feature_indices={},
)
dataset = _create_dataset(
self,
df,
predict_mode=True,
prediction_frequency=prediction_frequency,
components_stacker=components_stacker,
)
self.model.set_components_stacker(components_stacker, mode="predict")
loader = DataLoader(dataset, batch_size=min(1024, len(df)), shuffle=False, drop_last=False)
if self.n_forecasts > 1:
dates = df["ds"].iloc[self.max_lags : -self.n_forecasts + 1]
Expand All @@ -2893,7 +2969,7 @@ def _predict_raw(self, df, df_name, include_components=False, prediction_frequen
self.model.set_covar_weights(self.model.get_covar_weights())
# Compute the predictions and components (if requested)
result = self.trainer.predict(self.model, loader)
# Extract the prediction and components
# unstack the prediction and components
predicted, component_vectors = zip(*result)
predicted = np.concatenate(predicted)

Expand Down
Loading

0 comments on commit 9c4963c

Please sign in to comment.