Skip to content

Commit

Permalink
remove dots from feature names in distributed (#382)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmoralez authored Jul 17, 2024
1 parent 1c08dc3 commit cd41f7a
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 203 deletions.
12 changes: 12 additions & 0 deletions mlforecast/distributed/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
Lags,
TargetTransform,
TimeSeries,
_build_transform_name,
_name_models,
)
from ..forecast import MLForecast
Expand All @@ -71,6 +72,7 @@ def __init__(
target_transforms: Optional[List[TargetTransform]] = None,
engine=None,
num_partitions: Optional[int] = None,
lag_transforms_namer: Optional[Callable] = None,
):
"""Create distributed forecast object
Expand Down Expand Up @@ -98,6 +100,8 @@ def __init__(
by the `fit` and `cross_validation` methods will be used. If a Ray
Dataset is provided and `num_partitions` is None, the partitioning
will be done by the `id_col`.
lag_transforms_namer : callable, optional(default=None)
Function that takes a transformation (either function or class), a lag and extra arguments and produces a name
"""
if not isinstance(models, dict) and not isinstance(models, list):
models = [models]
Expand All @@ -107,13 +111,21 @@ def __init__(
else:
models_with_names = models
self.models = models_with_names
if lag_transforms_namer is None:

def name_without_dots(tfm, lag, *args):
name = _build_transform_name(tfm, lag, args)
return name.replace(".", "_")

lag_transforms_namer = name_without_dots
self._base_ts = TimeSeries(
freq=freq,
lags=lags,
lag_transforms=lag_transforms,
date_features=date_features,
num_threads=num_threads,
target_transforms=target_transforms,
lag_transforms_namer=lag_transforms_namer,
)
self.engine = engine
self.num_partitions = num_partitions
Expand Down
100 changes: 55 additions & 45 deletions nbs/distributed.forecast.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
" Lags,\n",
" TargetTransform,\n",
" TimeSeries,\n",
" _build_transform_name,\n",
" _name_models,\n",
")\n",
"from mlforecast.forecast import MLForecast\n",
Expand Down Expand Up @@ -144,7 +145,8 @@
" num_threads: int = 1,\n",
" target_transforms: Optional[List[TargetTransform]] = None, \n",
" engine = None,\n",
" num_partitions: Optional[int] = None, \n",
" num_partitions: Optional[int] = None,\n",
" lag_transforms_namer: Optional[Callable] = None,\n",
" ):\n",
" \"\"\"Create distributed forecast object\n",
"\n",
Expand Down Expand Up @@ -172,6 +174,8 @@
" by the `fit` and `cross_validation` methods will be used. If a Ray\n",
" Dataset is provided and `num_partitions` is None, the partitioning\n",
" will be done by the `id_col`.\n",
" lag_transforms_namer : callable, optional(default=None)\n",
" Function that takes a transformation (either function or class), a lag and extra arguments and produces a name\n",
" \"\"\" \n",
" if not isinstance(models, dict) and not isinstance(models, list):\n",
" models = [models]\n",
Expand All @@ -181,13 +185,19 @@
" else:\n",
" models_with_names = models\n",
" self.models = models_with_names\n",
" if lag_transforms_namer is None:\n",
" def name_without_dots(tfm, lag, *args):\n",
" name = _build_transform_name(tfm, lag, args)\n",
" return name.replace('.', '_')\n",
" lag_transforms_namer = name_without_dots\n",
" self._base_ts = TimeSeries(\n",
" freq=freq,\n",
" lags=lags,\n",
" lag_transforms=lag_transforms,\n",
" date_features=date_features,\n",
" num_threads=num_threads,\n",
" target_transforms=target_transforms,\n",
" lag_transforms_namer=lag_transforms_namer,\n",
" )\n",
" self.engine = engine\n",
" self.num_partitions = num_partitions\n",
Expand Down Expand Up @@ -910,19 +920,19 @@
"\n",
"### DistributedMLForecast\n",
"\n",
"> DistributedMLForecast (models,\n",
"> freq:Union[int,str,pandas._libs.tslibs.offsets.Bas\n",
"> eOffset], lags:Optional[Iterable[int]]=None, lag_t\n",
"> ransforms:Optional[Dict[int,List[Union[Callable,Tu\n",
"> ple[Callable,Any]]]]]=None, date_features:Optional\n",
"> [Iterable[Union[str,Callable]]]=None,\n",
"> num_threads:int=1, target_transforms:Optional[List\n",
"> [Union[mlforecast.target_transforms.BaseTargetTran\n",
"> sform,mlforecast.target_transforms._BaseGroupedArr\n",
"> ayTargetTransform]]]=None, engine=None,\n",
"> num_partitions:Optional[int]=None)\n",
"\n",
"Multi backend distributed pipeline"
"> DistributedMLForecast (models, freq:Union[int,str],\n",
"> lags:Optional[Iterable[int]]=None, lag_transforms:\n",
"> Optional[Dict[int,List[Union[Callable,Tuple[Callab\n",
"> le,Any]]]]]=None, date_features:Optional[Iterable[\n",
"> Union[str,Callable]]]=None, num_threads:int=1, tar\n",
"> get_transforms:Optional[List[Union[mlforecast.targ\n",
"> et_transforms.BaseTargetTransform,mlforecast.targe\n",
"> t_transforms._BaseGroupedArrayTargetTransform]]]=N\n",
"> one, engine=None,\n",
"> num_partitions:Optional[int]=None,\n",
"> lag_transforms_namer:Optional[Callable]=None)\n",
"\n",
"*Multi backend distributed pipeline*"
],
"text/plain": [
"---\n",
Expand All @@ -931,19 +941,19 @@
"\n",
"### DistributedMLForecast\n",
"\n",
"> DistributedMLForecast (models,\n",
"> freq:Union[int,str,pandas._libs.tslibs.offsets.Bas\n",
"> eOffset], lags:Optional[Iterable[int]]=None, lag_t\n",
"> ransforms:Optional[Dict[int,List[Union[Callable,Tu\n",
"> ple[Callable,Any]]]]]=None, date_features:Optional\n",
"> [Iterable[Union[str,Callable]]]=None,\n",
"> num_threads:int=1, target_transforms:Optional[List\n",
"> [Union[mlforecast.target_transforms.BaseTargetTran\n",
"> sform,mlforecast.target_transforms._BaseGroupedArr\n",
"> ayTargetTransform]]]=None, engine=None,\n",
"> num_partitions:Optional[int]=None)\n",
"\n",
"Multi backend distributed pipeline"
"> DistributedMLForecast (models, freq:Union[int,str],\n",
"> lags:Optional[Iterable[int]]=None, lag_transforms:\n",
"> Optional[Dict[int,List[Union[Callable,Tuple[Callab\n",
"> le,Any]]]]]=None, date_features:Optional[Iterable[\n",
"> Union[str,Callable]]]=None, num_threads:int=1, tar\n",
"> get_transforms:Optional[List[Union[mlforecast.targ\n",
"> et_transforms.BaseTargetTransform,mlforecast.targe\n",
"> t_transforms._BaseGroupedArrayTargetTransform]]]=N\n",
"> one, engine=None,\n",
"> num_partitions:Optional[int]=None,\n",
"> lag_transforms_namer:Optional[Callable]=None)\n",
"\n",
"*Multi backend distributed pipeline*"
]
},
"execution_count": null,
Expand Down Expand Up @@ -976,7 +986,7 @@
"> dropna:bool=True,\n",
"> keep_last_n:Optional[int]=None)\n",
"\n",
"Apply the feature engineering and train the models.\n",
"*Apply the feature engineering and train the models.*\n",
"\n",
"| | **Type** | **Default** | **Details** |\n",
"| -- | -------- | ----------- | ----------- |\n",
Expand All @@ -1002,7 +1012,7 @@
"> dropna:bool=True,\n",
"> keep_last_n:Optional[int]=None)\n",
"\n",
"Apply the feature engineering and train the models.\n",
"*Apply the feature engineering and train the models.*\n",
"\n",
"| | **Type** | **Default** | **Details** |\n",
"| -- | -------- | ----------- | ----------- |\n",
Expand Down Expand Up @@ -1047,7 +1057,7 @@
"> me.DataFrame]=None,\n",
"> new_df:Optional[~AnyDataFrame]=None)\n",
"\n",
"Compute the predictions for the next `horizon` steps.\n",
"*Compute the predictions for the next `horizon` steps.*\n",
"\n",
"| | **Type** | **Default** | **Details** |\n",
"| -- | -------- | ----------- | ----------- |\n",
Expand All @@ -1072,7 +1082,7 @@
"> me.DataFrame]=None,\n",
"> new_df:Optional[~AnyDataFrame]=None)\n",
"\n",
"Compute the predictions for the next `horizon` steps.\n",
"*Compute the predictions for the next `horizon` steps.*\n",
"\n",
"| | **Type** | **Default** | **Details** |\n",
"| -- | -------- | ----------- | ----------- |\n",
Expand Down Expand Up @@ -1110,7 +1120,7 @@
"\n",
"> DistributedMLForecast.save (path:str)\n",
"\n",
"Save forecast object\n",
"*Save forecast object*\n",
"\n",
"| | **Type** | **Details** |\n",
"| -- | -------- | ----------- |\n",
Expand All @@ -1126,7 +1136,7 @@
"\n",
"> DistributedMLForecast.save (path:str)\n",
"\n",
"Save forecast object\n",
"*Save forecast object*\n",
"\n",
"| | **Type** | **Details** |\n",
"| -- | -------- | ----------- |\n",
Expand Down Expand Up @@ -1160,7 +1170,7 @@
"\n",
"> DistributedMLForecast.load (path:str, engine)\n",
"\n",
"Load forecast object\n",
"*Load forecast object*\n",
"\n",
"| | **Type** | **Details** |\n",
"| -- | -------- | ----------- |\n",
Expand All @@ -1177,7 +1187,7 @@
"\n",
"> DistributedMLForecast.load (path:str, engine)\n",
"\n",
"Load forecast object\n",
"*Load forecast object*\n",
"\n",
"| | **Type** | **Details** |\n",
"| -- | -------- | ----------- |\n",
Expand Down Expand Up @@ -1212,7 +1222,7 @@
"\n",
"> DistributedMLForecast.update (df:pandas.core.frame.DataFrame)\n",
"\n",
"Update the values of the stored series.\n",
"*Update the values of the stored series.*\n",
"\n",
"| | **Type** | **Details** |\n",
"| -- | -------- | ----------- |\n",
Expand All @@ -1228,7 +1238,7 @@
"\n",
"> DistributedMLForecast.update (df:pandas.core.frame.DataFrame)\n",
"\n",
"Update the values of the stored series.\n",
"*Update the values of the stored series.*\n",
"\n",
"| | **Type** | **Details** |\n",
"| -- | -------- | ----------- |\n",
Expand Down Expand Up @@ -1256,30 +1266,30 @@
"text/markdown": [
"---\n",
"\n",
"[source](https://github.com/Nixtla/mlforecast/blob/main/mlforecast/distributed/forecast.py#L739){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n",
"[source](https://github.com/Nixtla/mlforecast/blob/main/mlforecast/distributed/forecast.py#L747){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n",
"\n",
"### DistributedMLForecast.to_local\n",
"\n",
"> DistributedMLForecast.to_local ()\n",
"\n",
"Convert this distributed forecast object into a local one\n",
"*Convert this distributed forecast object into a local one\n",
"\n",
"This pulls all the data from the remote machines, so you have to be sure that \n",
"it fits in the scheduler/driver. If you're not sure use the save method instead."
"it fits in the scheduler/driver. If you're not sure use the save method instead.*"
],
"text/plain": [
"---\n",
"\n",
"[source](https://github.com/Nixtla/mlforecast/blob/main/mlforecast/distributed/forecast.py#L739){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n",
"[source](https://github.com/Nixtla/mlforecast/blob/main/mlforecast/distributed/forecast.py#L747){target=\"_blank\" style=\"float:right; font-size:smaller\"}\n",
"\n",
"### DistributedMLForecast.to_local\n",
"\n",
"> DistributedMLForecast.to_local ()\n",
"\n",
"Convert this distributed forecast object into a local one\n",
"*Convert this distributed forecast object into a local one\n",
"\n",
"This pulls all the data from the remote machines, so you have to be sure that \n",
"it fits in the scheduler/driver. If you're not sure use the save method instead."
"it fits in the scheduler/driver. If you're not sure use the save method instead.*"
]
},
"execution_count": null,
Expand Down Expand Up @@ -1313,7 +1323,7 @@
"> e, dropna:bool=True,\n",
"> keep_last_n:Optional[int]=None)\n",
"\n",
"Add the features to `data`.\n",
"*Add the features to `data`.*\n",
"\n",
"| | **Type** | **Default** | **Details** |\n",
"| -- | -------- | ----------- | ----------- |\n",
Expand All @@ -1340,7 +1350,7 @@
"> e, dropna:bool=True,\n",
"> keep_last_n:Optional[int]=None)\n",
"\n",
"Add the features to `data`.\n",
"*Add the features to `data`.*\n",
"\n",
"| | **Type** | **Default** | **Details** |\n",
"| -- | -------- | ----------- | ----------- |\n",
Expand Down
Loading

0 comments on commit cd41f7a

Please sign in to comment.