Source code for etna.transforms.feature_selection.gale_shapley
import warnings
from math import ceil
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
import pandas as pd
from typing_extensions import Literal
from etna.analysis import RelevanceTable
from etna.core import BaseMixin
from etna.distributions import BaseDistribution
from etna.distributions import CategoricalDistribution
from etna.distributions import IntDistribution
from etna.transforms.feature_selection.base import BaseFeatureSelectionTransform
[docs]class BaseGaleShapley(BaseMixin):
"""Base class for a member of Gale-Shapley matching."""
def __init__(self, name: str, ranked_candidates: List[str]):
"""Init BaseGaleShapley.
Parameters
----------
name:
name of object
ranked_candidates:
list of preferences for the object ranked descending by importance
"""
self.name = name
self.ranked_candidate = ranked_candidates
self.candidates_rank = {candidate: i for i, candidate in enumerate(self.ranked_candidate)}
self.tmp_match: Optional[str] = None
self.tmp_match_rank: Optional[int] = None
self.is_available = True
[docs] def update_tmp_match(self, name: str):
"""Create match with object name.
Parameters
----------
name:
name of candidate to match
"""
self.tmp_match = name
self.tmp_match_rank = self.candidates_rank[name]
self.is_available = False
[docs] def reset_tmp_match(self):
"""Break tmp current."""
self.tmp_match = None
self.tmp_match_rank = None
self.is_available = True
[docs]class SegmentGaleShapley(BaseGaleShapley):
"""Class for segment member of Gale-Shapley matching."""
def __init__(self, name: str, ranked_candidates: List[str]):
"""Init SegmentGaleShapley.
Parameters
----------
name:
name of segment
ranked_candidates:
list of features sorted descending by importance
"""
super().__init__(name=name, ranked_candidates=ranked_candidates)
self.last_candidate: Optional[int] = None
[docs] def update_tmp_match(self, name: str):
"""Create match with given feature.
Parameters
----------
name:
name of feature to match
"""
super().update_tmp_match(name=name)
self.last_candidate = self.tmp_match_rank
[docs] def get_next_candidate(self) -> Optional[str]:
"""Get name of the next feature to try.
Returns
-------
name:
name of feature
"""
if self.last_candidate is None:
self.last_candidate = 0
else:
self.last_candidate += 1
if self.last_candidate >= len(self.ranked_candidate):
return None
return self.ranked_candidate[self.last_candidate]
[docs]class FeatureGaleShapley(BaseGaleShapley):
"""Class for feature member of Gale-Shapley matching."""
[docs] def check_segment(self, segment: str) -> bool:
"""Check if given segment is better than current match according to preference list.
Parameters
----------
segment:
segment to check
Returns
-------
is_better:
returns True if given segment is a better candidate than current match.
"""
if self.tmp_match is None or self.tmp_match_rank is None:
return True
return self.candidates_rank[segment] < self.tmp_match_rank
[docs]class GaleShapleyMatcher(BaseMixin):
"""Class for handling Gale-Shapley matching algo."""
def __init__(self, segments: List[SegmentGaleShapley], features: List[FeatureGaleShapley]):
"""Init GaleShapleyMatcher.
Parameters
----------
segments:
list of segments to build matches
features:
list of features to build matches
"""
self.segments = segments
self.features = features
self.segment_by_name = {segment.name: segment for segment in self.segments}
self.feature_by_name = {feature.name: feature for feature in self.features}
[docs] @staticmethod
def match(segment: SegmentGaleShapley, feature: FeatureGaleShapley):
"""Build match between segment and feature.
Parameters
----------
segment:
segment to match
feature:
feature to match
"""
segment.update_tmp_match(name=feature.name)
feature.update_tmp_match(name=segment.name)
[docs] @staticmethod
def break_match(segment: SegmentGaleShapley, feature: FeatureGaleShapley):
"""Break match between segment and feature.
Parameters
----------
segment:
segment to break match
feature:
feature to break match
"""
segment.reset_tmp_match()
feature.reset_tmp_match()
def _gale_shapley_iteration(self, available_segments: List[SegmentGaleShapley]) -> bool:
"""
Run iteration of Gale-Shapley matching for given available_segments.
Parameters
----------
available_segments:
list of segments that have no match at this iteration
Returns
-------
success:
True if there is at least one match attempt at the iteration
Notes
-----
Success code is necessary because in ETNA usage we can not guarantee that number of features will be
big enough to build matches with all the segments. In case ``n_features < n_segments`` some segments always stay
available that can cause infinite while loop in ``__call__``.
"""
success = False
for segment in available_segments:
next_feature_candidate_name = segment.get_next_candidate()
if next_feature_candidate_name is None:
continue
next_feature_candidate = self.feature_by_name[next_feature_candidate_name]
success = True
if next_feature_candidate.check_segment(segment=segment.name):
if not next_feature_candidate.is_available: # is_available = tmp_match is not None
self.break_match(
segment=self.segment_by_name[next_feature_candidate.tmp_match], # type: ignore
feature=next_feature_candidate,
)
self.match(segment=segment, feature=next_feature_candidate)
return success
def _get_available_segments(self) -> List[SegmentGaleShapley]:
"""Get list of available segments."""
return [segment for segment in self.segments if segment.is_available]
def __call__(self) -> Dict[str, str]:
"""Run matching.
Returns
-------
matching:
matching dict of segment x feature
"""
success_run = True
available_segments = self._get_available_segments()
while available_segments and success_run:
success_run = self._gale_shapley_iteration(available_segments=available_segments)
available_segments = self._get_available_segments()
return {segment.name: segment.tmp_match for segment in self.segments if segment.tmp_match is not None}
[docs]class GaleShapleyFeatureSelectionTransform(BaseFeatureSelectionTransform):
"""Transform that provides feature filtering by Gale-Shapley matching algorithm according to the relevance table.
Notes
-----
Transform works with any type of features, however most of the models works only with regressors.
Therefore, it is recommended to pass the regressors into the feature selection transforms.
As input, we have a table of relevances with size :math:`N\_{f} \times N\_{s}` where :math:`N\_{f}` -- number of features,
:math:`N\_{s}` -- number of segments.
Procedure of filtering features consist of :math:`\lceil \frac{k}{N\_{s}} \rceil` iterations.
Algorithm of each iteration:
- build a matching between segments and features by `Gale–Shapley algorithm <https://en.wikipedia.org/wiki/Gale%E2%80%93Shapley_algorithm>`_
according to the relevance table, during the matching segments send proposals to features;
- select features to add by taking matched feature for each segment;
- add selected features to accumulated list of selected features taking into account that this list shouldn't exceed the size of ``top_k``;
- remove added features from future consideration.
"""
def __init__(
self,
relevance_table: RelevanceTable,
top_k: int,
features_to_use: Union[List[str], Literal["all"]] = "all",
use_rank: bool = False,
return_features: bool = False,
**relevance_params,
):
"""Init GaleShapleyFeatureSelectionTransform.
Parameters
----------
relevance_table:
class to build relevance table
top_k:
number of features that should be selected from all the given ones
features_to_use:
columns of the dataset to select from
if "all" value is given, all columns are used
use_rank:
if True, use rank in relevance table computation
return_features:
indicates whether to return features or not.
"""
super().__init__(features_to_use=features_to_use, return_features=return_features)
self.relevance_table = relevance_table
self.top_k = top_k
self.use_rank = use_rank
self.greater_is_better = False if use_rank else relevance_table.greater_is_better
self.relevance_params = relevance_params
def _compute_relevance_table(self, df: pd.DataFrame, features: List[str]) -> pd.DataFrame:
"""Compute relevance table with given data."""
targets_df = df.loc[:, pd.IndexSlice[:, "target"]]
features_df = df.loc[:, pd.IndexSlice[:, features]]
table = self.relevance_table(
df=targets_df, df_exog=features_df, return_ranks=self.use_rank, **self.relevance_params
)
return table
@staticmethod
def _get_ranked_list(table: pd.DataFrame, ascending: bool) -> Dict[str, List[str]]:
"""Get ranked lists of candidates from table of relevance."""
ranked_features = {key: list(table.loc[key].sort_values(ascending=ascending).index) for key in table.index}
return ranked_features
@staticmethod
def _compute_gale_shapley_steps_number(top_k: int, n_segments: int, n_features: int) -> int:
"""Get number of necessary Gale-Shapley algo iterations."""
if n_features < top_k:
warnings.warn(
f"Given top_k={top_k} is bigger than n_features={n_features}. " f"Transform will not filter features."
)
return 1
if top_k < n_segments:
warnings.warn(
f"Given top_k={top_k} is less than n_segments={n_segments}. "
f"Algo will filter data without Gale-Shapley run."
)
return 1
return ceil(top_k / n_segments)
@staticmethod
def _gale_shapley_iteration(
segment_features_ranking: Dict[str, List[str]],
feature_segments_ranking: Dict[str, List[str]],
) -> Dict[str, str]:
"""Build matching for all the segments.
Parameters
----------
segment_features_ranking:
dict of relevance segment x sorted features
Returns
-------
matching dict:
dict of segment x feature
"""
gssegments = [
SegmentGaleShapley(
name=name,
ranked_candidates=ranked_candidates,
)
for name, ranked_candidates in segment_features_ranking.items()
]
gsfeatures = [
FeatureGaleShapley(name=name, ranked_candidates=ranked_candidates)
for name, ranked_candidates in feature_segments_ranking.items()
]
matcher = GaleShapleyMatcher(segments=gssegments, features=gsfeatures)
new_matches = matcher()
return new_matches
@staticmethod
def _update_ranking_list(
segment_features_ranking: Dict[str, List[str]], features_to_drop: List[str]
) -> Dict[str, List[str]]:
"""Delete chosen features from candidates ranked lists."""
for segment in segment_features_ranking:
for feature in features_to_drop:
segment_features_ranking[segment].remove(feature)
return segment_features_ranking
@staticmethod
def _process_last_step(
matches: Dict[str, str], relevance_table: pd.DataFrame, n: int, greater_is_better: bool
) -> List[str]:
"""Choose n features from given ones according to relevance_matrix."""
features_relevance = {feature: relevance_table[feature][segment] for segment, feature in matches.items()}
sorted_features = sorted(features_relevance.items(), key=lambda item: item[1], reverse=greater_is_better)
selected_features = [feature[0] for feature in sorted_features][:n]
return selected_features
def _fit(self, df: pd.DataFrame) -> "GaleShapleyFeatureSelectionTransform":
"""Fit Gale-Shapley algo and find a pool of ``top_k`` features.
Parameters
----------
df:
dataframe to fit algo
"""
features = self._get_features_to_use(df=df)
relevance_table = self._compute_relevance_table(df=df, features=features)
segment_features_ranking = self._get_ranked_list(
table=relevance_table, ascending=not self.relevance_table.greater_is_better
)
feature_segments_ranking = self._get_ranked_list(
table=relevance_table.T, ascending=not self.relevance_table.greater_is_better
)
gale_shapley_steps_number = self._compute_gale_shapley_steps_number(
top_k=self.top_k,
n_segments=len(segment_features_ranking),
n_features=len(feature_segments_ranking),
)
last_step_features_number = self.top_k % len(segment_features_ranking)
for step in range(gale_shapley_steps_number):
matches = self._gale_shapley_iteration(
segment_features_ranking=segment_features_ranking,
feature_segments_ranking=feature_segments_ranking,
)
if step == gale_shapley_steps_number - 1 and last_step_features_number != 0:
selected_features = self._process_last_step(
matches=matches,
relevance_table=relevance_table,
n=last_step_features_number,
greater_is_better=self.greater_is_better,
)
else:
selected_features = list(matches.values())
self.selected_features.extend(selected_features)
segment_features_ranking = self._update_ranking_list(
segment_features_ranking=segment_features_ranking, features_to_drop=selected_features
)
return self
[docs] def params_to_tune(self) -> Dict[str, BaseDistribution]:
"""Get default grid for tuning hyperparameters.
This grid tunes parameters: ``top_k``, ``use_rank``. Other parameters are expected to be set by the user.
For ``top_k`` parameter the maximum suggested value is not greater than ``self.top_k``.
Returns
-------
:
Grid to tune.
"""
return {
"top_k": IntDistribution(low=1, high=self.top_k),
"use_rank": CategoricalDistribution([False, True]),
}