Source code for hcrystalball.model_selection._data_preparation

import numpy as np
import pandas as pd


[docs]def partition_data(df, partition_by): """Partition data by values found in one or more columns. For each of the selected columns the unique values will be determined and a selection will be made for each element in the cross product of the unique values. Parameters ---------- df : pandas.DataFrame Data to be partitioned partition_by : list Column names to partition by Returns ------- dict Partition dictionary with keys: * labels : Tuple of dictionaries whose keys are the column names and values are the actual values in the column * data : Tuple of pandas.DataFrame objects holding the subset of the data with """ labels = [] data = [] for key, group in df.groupby(partition_by): if not isinstance(key, tuple): key = (key,) labels.append(dict(zip(partition_by, key))) data.append(group.drop(partition_by, axis=1)) return {"labels": tuple(labels), "data": tuple(data)}
[docs]def partition_data_by_values(df, column, partition_values, default_df=None): """Partition data by one column and a fixed set ov values within that column. If a value is not present, optionally provide default data for the partition. Parameters ---------- df : pandas.DataFrame Data to be partitioned column : str column with values to partition by partition_values: list values to partition by default_df : pandas.DataFrame data to be used as default in case value is not present Returns ------- dict Partition dictionary with keys: * labels : Tuple of dictionaries whose keys are the column names and values are the actual values in the column * data : Tuple of pandas.DataFrame objects holding the subset of the data with """ labels = [] data = [] for v in partition_values: part = df[df[column] == v] if part.empty: if default_df is None: continue part = default_df.copy() labels.append({column: v}) data.append(part.drop(column, axis=1)) return {"labels": tuple(labels), "data": tuple(data)}
[docs]def filter_data(df, include_rules=None, exclude_rules=None): """Filter provided dataframe by {column:value} rules. Parameters ---------- df : pandas.DataFrame Data to be filtered include_rules : dict Rules for what to include. The keys of the dictionaries should be the name of the filtered columns, while the values of the dictionary should be list of values to include. exclude_rules : dict Rules for what to include. The keys of the dictionaries should be the name of the filtered columns, while the values of the dictionary should be list of values to exclude. Returns ------- pandas.DataFrame: Data of the same type / format as the output with the filters applied. """ df = df.copy() # Check for overlap between include and exclude rules and raise an exception if there is any overlap if (include_rules is not None) & (exclude_rules is not None): common_keys = set(include_rules.keys()) & set(exclude_rules.keys()) for key in common_keys: common_values = set(include_rules[key]) & set(exclude_rules[key]) if len(common_values) > 0: raise ValueError( f"Overlap is found in include_rules and exclude_rules in key `{key}`." f"This is not allowed" ) if include_rules is not None: if not isinstance(include_rules, dict): raise TypeError("`include_rules` is not a dictionary.") mask = pd.Series(index=df.index) mask[:] = True for key, value in include_rules.items(): mask = mask & (df[key].isin(value)) df = df.loc[mask, :] if exclude_rules is not None: if not isinstance(exclude_rules, dict): raise TypeError("`exclude_rules` is not a dictionary.") mask = pd.Series(index=df.index) mask[:] = True for key, value in exclude_rules.items(): mask = mask & (~df[key].isin(value)) df = df.loc[mask, :] return df
[docs]def prepare_data_for_training( df, frequency, partition_columns, parallel_over_columns=None, country_code_column=None, ): """Prepare data for model selection. Transforms data to a form handled by model selection / training, ensuring correct frequency and filling NaN Parameters ---------- df : pandas.DataFrame Data to be transformed, must have a date column of type `str` frequency : str frequency identifier ('D', 'M' etc.) parallel_over_columns: list column(s) which define logical segmentation of data for training country_code_column : str name of columns from which to take holiday ISO information Returns ------- pandas.DataFrame Resampled, aggregated data """ parallel_over_columns = parallel_over_columns or {} partition_columns = list(set(partition_columns).difference(parallel_over_columns)) # TODO this check should go into separate function for check of exogeneous variables country_code_columns = ( [country_code_column] if isinstance(country_code_column, str) else country_code_column ) if country_code_column and not set(country_code_columns).issubset(set(df.columns)): raise KeyError( f"Column(s) {country_code_column} provided as `country_code_column` is not in dataframe!" ) df = df.astype({col: "category" for col in partition_columns}) num_cols = df.select_dtypes(include=np.number).columns.tolist() cat_cols = df.columns.difference(num_cols + partition_columns) if len(partition_columns) != 0: df = df.groupby(partition_columns) df = ( df.resample(frequency) .agg({**{col: "sum" for col in num_cols}, **{col: "last" for col in cat_cols}}) .fillna( { **{col: 0 for col in num_cols}, **{col: "" for col in cat_cols.difference(country_code_columns or set())}, } ) .reset_index(partition_columns) ) if country_code_columns: df[country_code_columns] = df[country_code_columns].fillna(method="ffill") return df