from abc import ABCMeta, abstractmethod
import pandas as pd
from hcrystalball.exceptions import InsufficientDataLengthError
from hcrystalball.wrappers._base import TSModelWrapper
from hcrystalball.wrappers._base import tsmodel_wrapper_constructor_factory
from hcrystalball.utils import check_X_y, enforce_y_type, check_fit_before_predict
class BaseSklearnWrapper(TSModelWrapper, metaclass=ABCMeta):
def __reduce__(self):
"""Resorting to reduce for unpickling to sneak in
a class definition created at runtime (see _ClassInitializer below)
"""
return (_ClassInitializer(), (self.model_class,), self.__dict__)
@abstractmethod
def __init__(self):
pass
def _transform_data_to_tsmodel_input_format(self, X, y=None, horizon=None):
"""Trasnform data into Sklearn API required form and shift them.
Shift is done in autoregressive format with `lags` columns based on prediction horizon which
is derived from length of provided input data for `predict` call.
Parameters
----------
X : pandas.DataFrame
Input features.
y : array_like, (1d)
Target vector
horizon: int
Number of steps used to shift the data
Returns
-------
X, y
X - pandas.DataFrame
y - numpy.ndarray
"""
if y is not None:
if self.lags + horizon > len(self._y):
raise InsufficientDataLengthError(
f"Sum of model lags ({self.lags}) and forecasting horizon ({horizon} "
f"cannot be bigger than length of y ({len(y)})."
)
y = self._y[self.lags + horizon - 1 :]
X = self._add_lag_features(X, self._y, horizon)
if X.filter(like="_holiday_").shape[1] > 0:
X = self._adjust_holidays(X)
X = X.astype(float)
return X, y
@staticmethod
def _adjust_holidays(X):
"""Transform 'holiday' columns to binary feature.
Parameters
----------
X : pandas.DataFrame
Input features with 'holiday' column.
Returns
-------
pandas.DataFrame
Holiday feature in numeric form
"""
return X.assign(**{col: X[col] != "" for col in X.filter(like="_holiday_").columns})
@enforce_y_type
@check_X_y
def fit(self, X, y):
"""Store X in self._X and y in self._y and instantiate the model.
Actual model fitting is done in `predict` method since the way model is fitted
depends on `prediction` horizon which is known only during `predict` call.
Parameters
----------
X : pandas.DataFrame
Input features.
y : array_like, (1d)
Target vector.
Returns
-------
self
"""
self._X, self._y = X, y
self.model = self._init_tsmodel(self.model_class)
self.fitted = True
return self
def _predict(self, X):
"""Transform stored training data to autoregressive form with `lags` features,
fit the model and output prediction based on transformed X features.
Parameters
----------
X : pandas.DataFrame
Input features.
Returns
-------
pandas.DataFrame
Prediction is stored in column with name being the `name` of the wrapper.
"""
X_fit, y_fit = self._transform_data_to_tsmodel_input_format(self._X, self._y, len(X))
self.model = (
self.model.fit(X_fit, y_fit, **self.fit_params)
if self.fit_params
else self.model.fit(X_fit, y_fit)
)
X_pred, _ = self._transform_data_to_tsmodel_input_format(X)
pred = self.model.predict(X_pred)
return pd.DataFrame(data=pred.reshape(-1, 1), columns=[self.name], index=X.index)
@check_fit_before_predict
def predict(self, X):
"""Predict using provided Sklearn compatible regressor.
If `optimize_for_horizon` is set to True, then new model is created for
each new horizon and fitted independently
(i.e. len(X)=5 --> horizon=5 --> 5 models will be fitted).
The final prediction is then combination of single point forecast of individual models
for different horizons.
Parameters
----------
X : pandas.DataFrame
Input features.
Returns
-------
pandas.DataFrame
Prediction is stored in column with name being the `name` of the wrapper.
"""
if len(X) > len(self._X) + 3:
raise InsufficientDataLengthError(
f"`X` must have at least {len(self._X) + 3} observations. Please provide valid data."
)
if self.optimize_for_horizon:
preds = pd.concat(
[self._predict(X.iloc[:index, :]).tail(1) for index in range(1, X.shape[0] + 1)]
)
else:
preds = self._predict(X)
preds.index = X.index
return self._clip_predictions(preds)
def _add_lag_features(self, X, y, horizon=None):
"""Transform input data X, y into autoregressive form - shift
them appropriately based on horizon and create `lags` columns.
Parameters
----------
X : pandas.DataFrame
Input features.
y : array_like, (1d)
Target vector.
horizon : int
length of X for `predict` method
Returns
-------
pandas.DataFrame
shifted dataframe with `lags` columns
"""
lag_features = []
shift = horizon if horizon else 0
y = y if horizon else y[-(len(X) + self.lags - 1) :]
for i in range(0, self.lags):
lag_features.append(pd.Series(y, name=f"lag_{i}").shift(i + shift))
X_lags = pd.concat(lag_features, axis=1)
X_lags = X_lags if horizon else X_lags.dropna().reset_index(drop=True)
X = X.reset_index(drop=True).join(X_lags).dropna()
return X
def _get_sklearn_wrapper(model_cls):
"""Factory function returning the model specific SklearnWrapper with provided `model_cls` parameters.
This function is required for sklearn compatibility since our SklearnWrapper
need to have all parameters of `model_cls` set already during SklearnWrapper definition time.
This factory function is not needed in case of
other wrappers since the regressor is already part of the wrapper.
Parameters
----------
model_cls : class of sklearn compatible regressor
i.e. LinearRegressor, GradientBoostingRegressor
Example
-------
>>> from hcrystalball.wrappers._sklearn import _get_sklearn_wrapper
>>> from sklearn.ensemble import RandomForestRegressor
>>> est = _get_sklearn_wrapper(RandomForestRegressor)(max_depth=6, clip_predictions_lower=0.)
>>> est
SklearnWrapper(bootstrap=True, ccp_alpha=0.0, clip_predictions_lower=0.0,
clip_predictions_upper=None, criterion='mse', fit_params=None,
lags=3, max_depth=6, max_features='auto', max_leaf_nodes=None,
max_samples=None, min_impurity_decrease=0.0,
min_impurity_split=None, min_samples_leaf=1, min_samples_split=2,
min_weight_fraction_leaf=0.0, n_estimators=100, n_jobs=None,
name='sklearn', oob_score=False, optimize_for_horizon=False,
random_state=None, verbose=0, warm_start=False)
Returns
-------
SklearnWrapper
"""
class SklearnWrapper(BaseSklearnWrapper):
""" "Wrapper for regressors compatible with Sklearn-API.
This wrapper allows you use Sklearn-API regressors as autoregressive models
for time series predictions. All model specific parameters will be passed to provided
regressor class (even thought there is no explicit *model_kwargs).
One side effect of the current implementation is very quick `fit` method since
all of the actual model fitting is done in `predict` method resulting
in longer inference time.
Parameters
----------
name : str
Name of the model instance, used also as column name for returned prediction.
lags: int
Number of last observations of dependent variable used for modeling (lags = 2,X = yt-1, yt-2).
fit_params: dict
Parameters passed to `fit` method of the regressor, i.e. sample_weight.
optimize_for_horizon: bool
Whether new model should be fitter for each horizon (i.e. horizon 3 will produce 3 model,
first for horizon 1, second for horizon 2 and third for horizon 3), this option ensures that
autoregressive model is using for each horizon the most recent observation possible.
clip_predictions_lower: float
Minimal value allowed for predictions - predictions will be clipped to this value.
clip_predictions_upper: float
Maximum value allowed for predictions - predictions will be clipped to this value.
"""
model_class = model_cls
@tsmodel_wrapper_constructor_factory(model_cls)
def __init__(
self,
lags=3,
name="sklearn",
fit_params=None,
optimize_for_horizon=False,
clip_predictions_lower=None,
clip_predictions_upper=None,
):
pass
return SklearnWrapper
[docs]def get_sklearn_wrapper(model_cls, **model_params):
"""Factory function returning the model specific SklearnWrapper with provided `model_cls` parameters.
This function is required for sklearn compatibility since our SklearnWrapper
need to have all parameters of `model_cls` set already during SklearnWrapper definition time.
This factory function is not needed in case of other wrappers since
the regressor is already part of the wrapper.
Parameters
----------
model_cls : class of sklearn compatible regressor
i.e. LinearRegressor, GradientBoostingRegressor
model_params:
`model_cls` specific parameters (e.g. max_depth) and/or
SklearnWrapper specific parameters (e.g. clip_predictions_lower)
Example
-------
>>> from hcrystalball.wrappers._sklearn import _get_sklearn_wrapper
>>> from sklearn.ensemble import RandomForestRegressor
>>> est = get_sklearn_wrapper(RandomForestRegressor, max_depth=6, clip_predictions_lower=0.)
>>> est
SklearnWrapper(bootstrap=True, ccp_alpha=0.0, clip_predictions_lower=0.0,
clip_predictions_upper=None, criterion='mse', fit_params=None,
lags=3, max_depth=6, max_features='auto', max_leaf_nodes=None,
max_samples=None, min_impurity_decrease=0.0,
min_impurity_split=None, min_samples_leaf=1, min_samples_split=2,
min_weight_fraction_leaf=0.0, n_estimators=100, n_jobs=None,
name='sklearn', oob_score=False, optimize_for_horizon=False,
random_state=None, verbose=0, warm_start=False)
Returns
-------
SklearnWrapper
"""
return _get_sklearn_wrapper(model_cls)(**model_params)
class _ClassInitializer:
"""Utility class helping with pickling/unpickling SklearnWrapper.
This helper class is needed because the class definition of
a wrapped sklearn model is only created at runtime, when the
'get_sklearn_wrapper' function is invoked. Pickling/unpickilng such a class
will fail since the object definition cannot be looked up when unpickling.
This class serves as a dummy for unpickling, which creates an "empty" class
and then replaces its code with a definition obtained from 'get_sklearn_wrapper'.
https://stackoverflow.com/questions/19855156/whats-the-exact-usage-of-reduce-in-pickler
"""
def __call__(self, model_class):
obj = _ClassInitializer()
obj.__class__ = _get_sklearn_wrapper(model_class)
return obj
__all__ = ["get_sklearn_wrapper"]