Source code for hcrystalball.feature_extraction._holiday_transformer
import pandas as pd
from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from workalendar.registry import registry
[docs]class HolidayTransformer(TransformerMixin, BaseEstimator):
"""Generate holiday feature based on provided ISO code
Parameters
----------
country_code: str
ISO code if country/region
country_code_column: str
name of the column which have the ISO code of the country/region
days_before: int
number of days before the holiday which will be taken into account
(i.e. 2 means that new bool column will be created and will be True for 2 days before holidays,
otherwise False)
days_after: int
number of days after the holiday which will be taken into account
(i.e. 2 means that new bool column will be created and will be True for 2 days after holidays,
otherwise False)
bridge_days: bool
overlaping `days_before` and `days_after` feature which serves for modeling between
holidays working days
Please be aware that you cannot provide both country_code and country_code_column
during initialization since this would be ambuguious. If you provide `country_code_column`
instead of `country_code` the ISO code found in the column will be assigned into `country_code` column.
"""
def __init__(
self,
country_code=None,
country_code_column=None,
days_before=0,
days_after=0,
bridge_days=False,
):
self.country_code = country_code
self.unified_country_code = country_code
self.country_code_column = country_code_column
self.days_before = days_before
self.days_after = days_after
self.bridge_days = bridge_days
if self.country_code is None and self.country_code_column is None:
raise ValueError("You need to provide `country_code` or `country_code_column`")
if self.country_code and self.country_code_column:
raise ValueError("Provide `country_code` or `country_code_column`, not both")
self._col_name = f"_holiday_{self.country_code or self.country_code_column}"
@property
def unified_country_code(self):
"""Utility storing country code or unique value from country_code_column"""
return self._unified_country_code
[docs] def get_feature_names(self):
"""Return list with features which the transformer generates"""
return [self._col_name]
@unified_country_code.setter
def unified_country_code(self, value):
if value is not None and value not in list(registry.region_registry.keys()):
raise ValueError("Unknown `country_code`. For list of valid codes please look at workalendar.")
self._unified_country_code = value
[docs] def fit(self, X, y=None):
"""Check if `date_col` has daily frequency
This check is in `fit` method since pandas.infer_freq is used which requires at least 3 observations.
Parameters
----------
X : pandas.DataFrame
Input features.
y : Any
Ignored
Returns
-------
HolidayTransformer
self
Raises
------
ValueError
in case daily frequency is not used or very few datapoints are provided in X
"""
if pd.infer_freq(X.index) != "D":
raise ValueError(
f"HolidayTransformer can be used only with daily frequency in index. "
f"Your index is of type {type(X.index)} with frequency {pd.infer_freq(X.index)}"
)
return self
[docs] def transform(self, X, y=None):
"""Create data with 'holiday' colummn
Columns contains names of the holidays based on provided 'date' column
Parameters
----------
X : pandas.DataFrame
Input features.
y : numpy.ndarray
iIgnored.
Returns
-------
pandas.DataFrame
DataFrame with `self._col_name` column including names of holidays for each of the date
Raises
------
KeyError
if 'country_code_column' is not found in X
ValueError
if country_code_column has more than 1 value in X
"""
if self.country_code_column:
if self.country_code_column not in X.columns:
raise KeyError(
f"Column {self.country_code_column} provided as "
f"`country_code_column` is not in dataframe!"
)
if X[self.country_code_column].nunique() != 1:
raise ValueError(
f"There needs to be only one unique value in entire `country_code_column` column. "
f"These values were found {X[self.country_code_column].unique()}!"
)
self.unified_country_code = X[self.country_code_column].unique()[0]
years = X.index.year.unique().tolist() + [max(X.index.year)]
cal = registry.region_registry[self.unified_country_code]()
holidays = (
pd.concat(
[pd.DataFrame(data=cal.holidays(year), columns=["date", self._col_name]) for year in years]
)
# one day could have multiple public holidays
.drop_duplicates(subset="date").set_index("date")
)
df = (
pd.merge(X, holidays, left_index=True, right_index=True, how="left")
.fillna({self._col_name: ""})
.drop(columns=[self.country_code_column], errors="ignore")
)
df = self._get_day_around_holiday_feature(
df, f"_{self.days_before}_before{self._col_name}", -self.days_before
)
df = self._get_day_around_holiday_feature(
df, f"_{self.days_after}_after{self._col_name}", self.days_after
)
if self.bridge_days:
if self.days_before == 0 or self.days_before == 0:
raise ValueError(
"""`bridge_days` feature is created only if `days_before`
and `days_before` are both greater than 0 """
)
else:
df = df.assign(
**{
f"_bridge{self._col_name}": lambda df: (
df[
[
f"_{self.days_before}_before{self._col_name}",
f"_{self.days_after}_after{self._col_name}",
]
].all(axis=1)
)
}
)
return df
def _get_day_around_holiday_feature(self, df, col_name, days):
"""
Add new boolean column into pandas.DataFrame with number of `days` being True
before or after the public holiday depending on `when` parameter.
Parameters
----------
df : pandas.DataFrame
column with `self._col_name` and datetime index
col_name : str
column name of new feature
days : int
number of days taken into account (with 0 doesn't create any column)
Returns
-------
pandas.DataFrame
DataFrame with `self._col_name` column and additional column `col_name` if `days` > 0
"""
for day in range(1, abs(days) + 1):
day = day if days > 0 else -day
df = df.assign(**{f"_{col_name}_{day}": lambda df: ((df[self._col_name] != "").shift(day))})
cols = df.filter(like=f"_{col_name}_").columns
if days != 0:
# all intermediate columns called (i.e. _{`col_name`}_) are combined into one as `col_name`
df = df.assign(**{f"{col_name}": lambda df: df[cols].any(axis=1)})
return df.drop(cols, axis=1)