Source code for etna.transforms.feature_selection.feature_importance
import warnings
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union
import numpy as np
import pandas as pd
from catboost import CatBoostRegressor
from sklearn.ensemble import ExtraTreesRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.tree import ExtraTreeRegressor
from typing_extensions import Literal
from etna.analysis import RelevanceTable
from etna.analysis.feature_selection.mrmr_selection import AggregationMode
from etna.analysis.feature_selection.mrmr_selection import mrmr
from etna.datasets import TSDataset
from etna.distributions import BaseDistribution
from etna.distributions import CategoricalDistribution
from etna.distributions import IntDistribution
from etna.transforms.feature_selection import BaseFeatureSelectionTransform
TreeBasedRegressor = Union[
DecisionTreeRegressor,
ExtraTreeRegressor,
RandomForestRegressor,
ExtraTreesRegressor,
GradientBoostingRegressor,
CatBoostRegressor,
]
[docs]class TreeFeatureSelectionTransform(BaseFeatureSelectionTransform):
"""Transform that selects features according to tree-based models feature importance.
Notes
-----
Transform works with any type of features, however most of the models works only with regressors.
Therefore, it is recommended to pass the regressors into the feature selection transforms.
"""
def __init__(
self,
model: Union[Literal["catboost"], Literal["random_forest"], TreeBasedRegressor],
top_k: int,
features_to_use: Union[List[str], Literal["all"]] = "all",
return_features: bool = False,
):
"""
Init TreeFeatureSelectionTransform.
Parameters
----------
model:
Model to make selection, it should have ``feature_importances_`` property
(e.g. all tree-based regressors in sklearn).
If ``catboost.CatBoostRegressor`` is given with no ``cat_features`` parameter,
then ``cat_features`` are set during ``fit`` to be equal to columns of category type.
Pre-defined options are also available:
* catboost: ``catboost.CatBoostRegressor(iterations=1000, silent=True)``;
* random_forest: ``sklearn.ensemble.RandomForestRegressor(n_estimators=100, random_state=0)``.
top_k:
num of features to select; if there are not enough features, then all will be selected
features_to_use:
columns of the dataset to select from; if "all" value is given, all columns are used
return_features:
indicates whether to return features or not.
"""
if not isinstance(top_k, int) or top_k < 0:
raise ValueError("Parameter top_k should be positive integer")
super().__init__(features_to_use=features_to_use, return_features=return_features)
self.top_k = top_k
if isinstance(model, str):
if model == "catboost":
self.model = CatBoostRegressor(iterations=1000, silent=True)
elif model == "random_forest":
self.model = RandomForestRegressor(random_state=0)
else:
raise ValueError(f"Not a valid option for model: {model}")
else:
self.model = model
def _get_train(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Get train data for model."""
features = self._get_features_to_use(df)
df = TSDataset.to_flatten(df).dropna()
train_target = df["target"]
train_data = df[features]
return train_data, train_target
def _get_features_weights(self, df: pd.DataFrame) -> Dict[str, float]:
"""Get weights for features based on model feature importances."""
train_data, train_target = self._get_train(df)
if isinstance(self.model, CatBoostRegressor) and self.model.get_param("cat_features") is None:
dtypes = train_data.dtypes
cat_features = dtypes[dtypes == "category"].index.tolist()
self.model.fit(train_data, train_target, cat_features=cat_features)
else:
self.model.fit(train_data, train_target)
weights_array = self.model.feature_importances_
weights_dict = {column: weights_array[i] for i, column in enumerate(train_data.columns)}
return weights_dict
@staticmethod
def _select_top_k_features(weights: Dict[str, float], top_k: int) -> List[str]:
keys = np.array(list(weights.keys()))
values = np.array(list(weights.values()))
idx_sort = np.argsort(values)[::-1]
idx_selected = idx_sort[:top_k]
return keys[idx_selected].tolist()
def _fit(self, df: pd.DataFrame) -> "TreeFeatureSelectionTransform":
"""
Fit the model and remember features to select.
Parameters
----------
df:
dataframe with all segments data
Returns
-------
result:
instance after fitting
"""
if len(self._get_features_to_use(df)) == 0:
warnings.warn("It is not possible to select features if there aren't any")
return self
weights = self._get_features_weights(df)
self.selected_features = self._select_top_k_features(weights, self.top_k)
return self
[docs] def params_to_tune(self) -> Dict[str, BaseDistribution]:
"""Get default grid for tuning hyperparameters.
This grid tunes parameters: ``model``, ``top_k``. Other parameters are expected to be set by the user.
For ``model`` parameter only pre-defined options are suggested.
For ``top_k`` parameter the maximum suggested value is not greater than ``self.top_k``.
Returns
-------
:
Grid to tune.
"""
return {
"model": CategoricalDistribution(["catboost", "random_forest"]),
"top_k": IntDistribution(low=1, high=self.top_k),
}
[docs]class MRMRFeatureSelectionTransform(BaseFeatureSelectionTransform):
"""Transform that selects features according to MRMR variable selection method adapted to the timeseries case.
Notes
-----
Transform works with any type of features, however most of the models works only with regressors.
Therefore, it is recommended to pass the regressors into the feature selection transforms.
"""
def __init__(
self,
relevance_table: RelevanceTable,
top_k: int,
features_to_use: Union[List[str], Literal["all"]] = "all",
fast_redundancy: bool = False,
relevance_aggregation_mode: str = AggregationMode.mean,
redundancy_aggregation_mode: str = AggregationMode.mean,
atol: float = 1e-10,
return_features: bool = False,
**relevance_params,
):
"""
Init MRMRFeatureSelectionTransform.
Parameters
----------
relevance_table:
method to calculate relevance table
top_k:
num of features to select; if there are not enough features, then all will be selected
features_to_use:
columns of the dataset to select from
if "all" value is given, all columns are used
fast_redundancy:
* True: compute redundancy only inside the the segments, time complexity :math:`O(top\_k * n\_segments * n\_features * history\_len)
* False: compute redundancy for all the pairs of segments, time complexity :math:`O(top\_k * n\_segments^2 * n\_features * history\_len)`
relevance_aggregation_mode:
the method for relevance values per-segment aggregation
redundancy_aggregation_mode:
the method for redundancy values per-segment aggregation
atol:
the absolute tolerance to compare the float values
return_features:
indicates whether to return features or not.
"""
if not isinstance(top_k, int) or top_k < 0:
raise ValueError("Parameter top_k should be positive integer")
super().__init__(features_to_use=features_to_use, return_features=return_features)
self.relevance_table = relevance_table
self.top_k = top_k
self.fast_redundancy = fast_redundancy
self.relevance_aggregation_mode = relevance_aggregation_mode
self.redundancy_aggregation_mode = redundancy_aggregation_mode
self.atol = atol
self.relevance_params = relevance_params
def _fit(self, df: pd.DataFrame) -> "MRMRFeatureSelectionTransform":
"""
Fit the method and remember features to select.
Parameters
----------
df:
dataframe with all segments data
Returns
-------
result:
instance after fitting
"""
features = self._get_features_to_use(df)
ts = TSDataset(df=df, freq=pd.infer_freq(df.index))
relevance_table = self.relevance_table(ts[:, :, "target"], ts[:, :, features], **self.relevance_params)
if not self.relevance_table.greater_is_better:
relevance_table *= -1
self.selected_features = mrmr(
relevance_table=relevance_table,
regressors=ts[:, :, features],
top_k=self.top_k,
fast_redundancy=self.fast_redundancy,
relevance_aggregation_mode=self.relevance_aggregation_mode,
redundancy_aggregation_mode=self.redundancy_aggregation_mode,
atol=self.atol,
)
return self
[docs] def params_to_tune(self) -> Dict[str, BaseDistribution]:
"""Get default grid for tuning hyperparameters.
This grid tunes ``top_k`` parameter. Other parameters are expected to be set by the user.
For ``top_k`` parameter the maximum suggested value is not greater than ``self.top_k``.
Returns
-------
:
Grid to tune.
"""
return {
"top_k": IntDistribution(low=1, high=self.top_k),
}