Source code for hcrystalball.utils

import collections
import functools
import hashlib
import os
from pathlib import Path

import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.utils import check_random_state

from hcrystalball.exceptions import InsufficientDataLengthError
from hcrystalball.exceptions import PredictWithoutFitError


def deep_dict_update(source, overrides):
    """
    Update a nested dictionary.
    """
    if overrides is None:
        overrides = {}
    if source is None:
        source = {}

    result = source.copy()

    for key, value in overrides.items():
        if isinstance(value, collections.Mapping) and value:
            returned = deep_dict_update(result.get(key, {}), value)
            result[key] = returned
        else:
            result[key] = overrides[key]
    return result


[docs]def optional_import(module_path, class_name, caller_namespace): """Imports optional dependencies. Imports class from module if importable, otherwise creates dummy class with the same name as requested class. Dummy class fails on init with ModuleNotFound error for the missing dependencies. Parameters ---------- module_name : str Path to the module class_name : str Name of the class to import/mock caller_namespace : dict, str Namespace of the caller Returns ------- list If importable [class_name] to extend __all__ of the calling module otherwise empty list """ dunder_all_extend = [] try: exec(f"from {module_path} import {class_name}", {}, caller_namespace) dunder_all_extend.append(class_name) except Exception: exec( f"class {class_name}:\n" f" 'This is just helper class to inform user about missing dependencies at init time'\n" f" def __init__(self, **kwargs):\n" f" # init always fails\n" f" from {module_path} import {class_name}\n", {}, caller_namespace, ) return dunder_all_extend
[docs]def get_estimator_repr(model, n_char_max=10000): """Get the string representation of a model Parameters ---------- model : hcrystalball.base.TSModelWrapper, sklearn.base.BaseEstimator instance of model n_char_max : int max. number of characters to be used for the string Returns ------- str: String representation of the model """ return model.__repr__(N_CHAR_MAX=n_char_max).replace("\n", "").replace(" ", "")
[docs]def generate_estimator_hash(model): """Generate a unique hash for a model using its string representation Parameters ---------- model: hcrystalball.base.TSModelWrapper, sklearn.base.BaseEstimator instance of model Returns ------- str: String containing the MD5 hash generated from the string representation of the model """ model_str = get_estimator_repr(model) model_label_hash = hashlib.md5(model_str.encode("utf-8")).hexdigest() return model_label_hash
[docs]def generate_partition_hash(partition_label): """Generate a unique hash for data partition Parameters ---------- partition_label : dict label in form {"column_1":"value_a", "column_2":"value_b"} Returns ------- str String containing the MD5 hash generated from the string representation of partition label """ h = hashlib.md5() sorted_keys = sorted(partition_label, key=str.lower) [h.update((str(key) + str(partition_label[key])).encode("utf-8")) for key in sorted_keys] return h.hexdigest()
def check_X_y(func): """Check that X and y have expected shapes and types""" @functools.wraps(func) def _check_X_y(self, X, y=None): if not isinstance(X, pd.DataFrame): raise TypeError("`X` must be a pandas dataframe.") if len(X) < 3: raise InsufficientDataLengthError( "`X` must have at least 3 observations. " "Please provide valid data." ) if not pd.api.types.is_datetime64_dtype(X.index): raise ValueError(f"`X` must contain index of type datetime. Your index is {X.index}") if y is not None: if not isinstance(y, (pd.Series, np.ndarray)): raise TypeError( f"`y` must be either pandas series or numpy ndarray. " f"You provided `{type(y)}`" ) if len(X) != len(y): raise ValueError( f"`X` and `y` must have same length. " f"`len(X)={len(X)}` and `len(y)={len(y)}`" ) if y.ndim != 1: raise ValueError(f"`y` must have 1 dimension. " f"You provided y with ndim={y.ndim}") return func(self, X, y) return _check_X_y def enforce_y_type(func): """Enforce y is 1-dimensional numpy array when pd.Series passed""" @functools.wraps(func) def _enforce_y_type(self, X, y): if isinstance(y, pd.Series): y = y.values return func(self, X, y) return _enforce_y_type def check_fit_before_predict(func): """Check if the model has been fitted first before calling the predict method""" @functools.wraps(func) def _check_fit_before_predict(self, X): if not self.fitted: raise PredictWithoutFitError(model_name=self.name) return func(self, X) return _check_fit_before_predict def get_estimator_name(estimator): """Get the name of the estimator Parameters ---------- estimator : sklearn.base.BaseEstimator, sklearn.pipeline.Pipeline instance of model Returns ------- str Name of the estimator. For TS Framework models, the name attribute of the estimator is returned, while for other model instances the class name will be returned. In case the input estimator is a pipeline the name of the last step in the pipeline, which is supposed to be a regressor/classifier will be returned. """ def __get_estimator_name(estimator, name): if isinstance(estimator, Pipeline): if name.strip() == "": name_tmp = estimator.steps[-1][0] else: name_tmp = "__".join([name, estimator.steps[-1][0]]) name = __get_estimator_name(estimator.steps[-1][1], name_tmp) return name else: if hasattr(estimator, "name"): if name.strip() == "": return estimator.name else: return "__".join([name, estimator.name]) else: if name.strip() == "": return estimator.__class__.__name__ else: return "__".join([name, estimator.__class__.__name__]) name = __get_estimator_name(estimator, "") return name
[docs]def get_sales_data(n_dates=100, n_assortments=2, n_states=3, n_stores=3): """Load subset of Rossmann store sales dataset. This function loads a subset of the Rossmann store sales dataset from https://www.kaggle.com/c/rossmann-store-sales with the 100 stores with the highest sales overall. The data is for stores in Germany, in the date range ``2015-04-23`` to ``2015-07-31``. The data is returned as a `pandas.DataFrame`: - ``Date`` - DataFrame index, date of recorded sales numbers - ``Store`` - a unique Id for each store - ``Sales`` - the turnover for any given day (this is what you are predicting) - ``Open`` - an indicator for whether the store was open: 0 = closed, 1 = open - ``Promo`` - indicates whether a store is running a promo on that day - ``SchoolHoliday`` - indicates if the (Store, Date) was affected by the closure of public schools - ``StoreType`` - differentiates between 4 different store models: a, b, c, d - ``Assortment`` - describes an assortment level: a = basic, b = extra, c = extended - ``Promo2`` - Promo2 is a continuing and consecutive promotion for some stores: 0 = store is not participating, 1 = store is participating - ``State`` - String code for state in Germany that the store is in (see https://en.wikipedia.org/wiki/States_of_Germany) - ``HolidayCode`` - the ``State`` prefixed with ``DE-``. The ``Assortment``, ``State`` and ``Store`` serve as data partitioning columns. ``HolidayCode`` will provide country specific holidays for the given ``Date``. ``Open``, ``Promo``, ``Promo2`` and ``SchoolHoliday`` serve as exogenous variables. ``Sales`` is the target column we will predict. Parameters ---------- n_dates : int Number of days to be included for each series n_assortments : int Number of assortments to included n_states : int Number of states to included n_stores : int Number of stores to included Example ------- >>> get_sales_data() Store Sales Open Promo SchoolHoliday StoreType Assortment Promo2 State HolidayCode Date 2015-04-23 906 8162 True False False a a False HE DE-HE 2015-04-23 251 16573 True False False a c False NW DE-NW 2015-04-23 320 13114 True False False a c False SH DE-SH 2015-04-23 335 11189 True False False b a True NW DE-NW 2015-04-23 336 10184 True False False a a False HE DE-HE ... ... ... ... ... ... ... ... ... ... ... 2015-07-31 817 23093 True True True a a False BE DE-BE 2015-07-31 831 15152 True True True a a False NW DE-NW 2015-07-31 906 15131 True True True a a False HE DE-HE 2015-07-31 586 17879 True True True a c False NW DE-NW 2015-07-31 251 22205 True True True a c False NW DE-NW Returns ------- pandas.DataFrame Rossmann store sales subset, see description above. Raises ------ ValueError Error is raised if the number of assortments is higher than what dataset holds, if there are less than requested number of states within any assortment, or if there are not enough valid combinations of number of assortments, states and stores. """ data_path = Path(__file__).parent / "data/rossmann_train_rich_top_100.csv" df = pd.read_csv(data_path, parse_dates=["Date"], index_col="Date") valid_n_stores = ( (df.groupby(["Assortment", "State"])["Store"].nunique()).loc[lambda x: x > n_stores].reset_index() ) if valid_n_stores["Assortment"].nunique() < n_assortments: raise ValueError( f"There aren't enough Assortments ({valid_n_stores['Assortment'].nunique()} " f"from requested {n_assortments}) " f"complying with having {n_stores} stores in each of {n_states} states" ) if any(valid_n_stores.groupby("Assortment")["State"].nunique() < n_states): raise ValueError( f"There aren't enough States from requested {n_states} within one Assortment " f"complying with having at least {n_stores} stores" f"{valid_n_stores.groupby('Assortment')['State'].nunique()}" ) valid_assortment_states = ( valid_n_stores.set_index(["Assortment", "State"]) .groupby(["Assortment"])["Store"] .nlargest(n_states) .reset_index(level=2) .reset_index(level=1) .set_index(["Assortment", "State"]) .index ) if len(valid_assortment_states) < n_assortments * n_states: raise ValueError( f"There are not {n_assortments} store types all having at least {n_states} " f"unique states with at least {n_stores} stores in within the data." f"Try lowering `n_assortments`, `n_states` or `n_stores`" f"{valid_assortment_states}" ) valid_stores = ( df.set_index(["Assortment", "State"]) .loc[lambda x: x.index.isin(valid_assortment_states[: n_assortments * n_states])] .reset_index() .groupby(["Assortment", "State", "Store"], as_index=False)["Sales"] .sum() .set_index("Store") .groupby(["Assortment", "State"])["Sales"] .nlargest(n_stores) .reset_index()["Store"] .values ) data = df.loc[ lambda x: (x["Store"].isin(valid_stores)) & (x.index.isin(x.index.drop_duplicates()[:n_dates])) ].sort_index() if len(data) != n_assortments * n_states * n_stores * n_dates: raise ValueError( f"There are not enough data for valid combinations of stores, store_types and states {data}" ) return data
[docs]def generate_tsdata(n_dates=365, random_state=None): """Generate dummy daily time series data compatible with hcrystalball API. Parameters ---------- n_dates : int Number of days to be included in the data random_state : int or RandomState instance Control random number generation following the scikit-learn pattern. Default is to use the global Numpy RNG. For reproducible results pass an int. To draw from a given RNG pass a RandomState. Returns ------- X : pandas.DataFrame DataFrame with `pandas.DatetimeIndex` y : pandas.Series Target values with `pandas.DatetimeIndex` """ random_state = check_random_state(random_state) trend = np.arange(n_dates) noise = random_state.uniform(0, 1, len(trend)) yearly_seasonality = 4 + 3 * np.sin(trend / 365 * 1 * np.pi) monthly_seasonality = 4 * np.sin(trend / 365 * 12 * np.pi + 365 / 2) weekly_seasonality = 4 * np.sin(trend / 365 * 52 * np.pi) trend = np.linspace(0, 5, n_dates) date_index = pd.date_range(start="2017-01-01", periods=len(trend), freq="D") y = yearly_seasonality + monthly_seasonality + weekly_seasonality + trend + noise y = pd.Series(y, index=date_index, name="target") X = pd.DataFrame(index=date_index) return X, y
[docs]def generate_multiple_tsdata( n_dates=10, n_regions=2, n_plants=3, n_products=4, country="DE", random_state=None ): """Provide easy way how to generate dummy data for tests or tutorial purposes Parameters ---------- n_dates : int lenght of one time-series n_regions : int number of regions within column 'Region' n_plants : int number of plants within colum 'Plant' n_products : int number of products within colum 'Product' country : str ISO code of country or country-region random_state : int or RandomState instance, Control random number generation following the scikit-learn pattern. Default is to use the global Numpy RNG. For reproducible results pass an int. To draw from a given RNG pass a RandomState. Returns ------- pandas.DataFrame Data with datetime index and following columns - "Region", "Plant", "Product" - for partitioning the data - "Country" - for holidays code - "Raining" - as bool exogenous variable - "Quantity" - as target variable """ random_state = check_random_state(random_state) str_index = pd.date_range("2018-01-01", periods=n_dates, freq="D") regions = [f"region_{i}" for i in range(n_regions)] plants = [f"plant_{i}" for i in range(n_plants)] products = [f"product_{i}" for i in range(n_products)] dfs = [] for region in regions: df_tmp = pd.DataFrame( columns=["Date", "Region", "Plant", "Product", "Country", "Raining", "Quantity"], index=range(len(str_index)), ) df_tmp.loc[:, "Region"] = region df_tmp.loc[:, "Country"] = country for plant in plants: df_tmp.loc[:, "Plant"] = plant for product in products: df_tmp.loc[:, "Date"] = str_index df_tmp.loc[:, "Product"] = product _, y = generate_tsdata(n_dates=n_dates) df_tmp.loc[:, "Quantity"] = y.values df_tmp.loc[:, "Raining"] = random_state.choice(a=[False, True], size=(n_dates,)) dfs.append(df_tmp.copy()) return pd.concat(dfs).set_index("Date")
def filter_statsmodels_warnings(warnings_to_filter=None): """Filter warnings, that statsmodels library typically emits Parameters ---------- warnings_to_filter : list, optional List of warnings to add to `warnings.simplefilter`. By default None translates to ConvergenceWarning, ValueWarning, FutureWarning, UserWarning """ import warnings from statsmodels.tools import sm_exceptions as sme default_warnings = [sme.ConvergenceWarning, FutureWarning, UserWarning, sme.ValueWarning] warnings_to_filter = warnings_to_filter if warnings_to_filter is not None else default_warnings [warnings.simplefilter("ignore", warning) for warning in warnings_to_filter] class _suppress_stdout_stderr(object): """ A context manager for doing a "deep suppression" of stdout and stderr in Python, i.e. will suppress all print, even if the print originates in a compiled C/Fortran sub-function. This will not suppress raised exceptions, since exceptions are printed to stderr just before a script exits, and after the context manager has exited (at least, I think that is why it lets exceptions through). References ---------- https://github.com/facebook/prophet/issues/223 """ def __init__(self): # Open a pair of null files self.null_fds = [os.open(os.devnull, os.O_RDWR) for x in range(2)] # Save the actual stdout (1) and stderr (2) file descriptors. self.save_fds = [os.dup(1), os.dup(2)] def __enter__(self): # Assign the null pointers to stdout and stderr. os.dup2(self.null_fds[0], 1) os.dup2(self.null_fds[1], 2) def __exit__(self, *_): # Re-assign the real stdout/stderr back to (1) and (2) os.dup2(self.save_fds[0], 1) os.dup2(self.save_fds[1], 2) # Close the null files for fd in self.null_fds + self.save_fds: os.close(fd)
[docs]def set_verbosity(func): @functools.wraps(func) def _set_verbosity(self, *args, **kwargs): if self.hcb_verbose: return func(self, *args, **kwargs) else: with _suppress_stdout_stderr(): return func(self, *args, **kwargs) return _set_verbosity
__all__ = [ "get_estimator_repr", "get_sales_data", "generate_estimator_hash", "generate_multiple_tsdata", "generate_partition_hash", "generate_tsdata", "optional_import", "set_verbosity", ]