Source code for hcrystalball.model_selection._split

import numpy as np


[docs]class FinerTimeSplit: """Time series cross-validator. Provide train/test indices to split data in train/test sets. The corresponding training set consists only of observations that occurred prior to the observation that forms the test set. Thus, no future observations can be used in constructing the forecast. Parameters ---------- n_splits: int Number of splits. horizon: int Number of steps ahead to make the forecast for. between_split_lag: int Number of observations between individual splits. """ def __init__(self, n_splits=10, horizon=10, between_split_lag=None): self.n_splits = n_splits self.horizon = horizon self.between_split_lag = between_split_lag
[docs] def split(self, X, y=None, groups=None): """Generate indices to split the data into training and test sets. Similar to scikit-learn API split. It takes n_splits*horizon from the tail of the data and use it for sequential generator of train/test indices. Parameters ---------- X : array-like Data container to be splitted to train and test data y : Any ignored groups : Any ignored Yields ------- int The next index to split the data into training and test set in a cross-validation. """ return self._split(X)
def _split(self, data): """Generate indices to split the data into training and test sets. Similar to scikit-learn API split. It takes n_splits*horizon from the tail of the data and use it for sequential generator of train/test indices. Parameters ---------- data: array-like Data container to be splitted to train and test data Yields ------ int The next index to split the data into training and test set in a cross-validation. """ try: n_samples = len(data) except TypeError as exc: raise TypeError( f"Data must be an array-like object, but it does not seem to be the case. " f"You provided {data}" ) from exc if (self.between_split_lag is not None and self.between_split_lag < 1) or self.horizon < 1: raise ValueError( f"`between_split_lag`({self.between_split_lag} and " f"`horizon`({self.horizon}) have to be greater than 1'" ) max_obs = ( self.horizon if self.between_split_lag is None else max(self.horizon, self.between_split_lag) ) if n_samples < self.n_splits * max_obs: raise ValueError( f"Cannot have number of samples({n_samples}) lower than the number " f"of `n_splits`({self.n_splits}) * `horizon`({self.horizon})," f"or `n_splits`({self.n_splits}) * `between_split_lag`({self.between_split_lag}) " f"if you provided `between_split_lag`" ) indices = np.arange(n_samples) if self.between_split_lag is not None: test_starts = range( n_samples - (self.between_split_lag * self.n_splits) - (self.horizon - self.between_split_lag), n_samples - (self.horizon - self.between_split_lag), self.between_split_lag, ) else: test_starts = range(n_samples - (self.horizon * self.n_splits), n_samples, self.horizon) for test_start in test_starts: yield ( indices[:test_start], indices[test_start : test_start + self.horizon], )
[docs] def get_n_splits(self, X=None, y=None, groups=None): """Return number of splits regarles of provided parameters Returns ------- int Number of splits """ return self.n_splits