import math
import warnings
from abc import abstractmethod
from copy import deepcopy
from enum import Enum
from typing import Any
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union
import numpy as np
import pandas as pd
from joblib import Parallel
from joblib import delayed
from scipy.stats import norm
from typing_extensions import TypedDict
from typing_extensions import assert_never
from etna.core import AbstractSaveable
from etna.core import BaseMixin
from etna.datasets import TSDataset
from etna.distributions import BaseDistribution
from etna.loggers import tslogger
from etna.metrics import Metric
from etna.metrics import MetricAggregationMode
Timestamp = Union[str, pd.Timestamp]
[docs]class CrossValidationMode(str, Enum):
"""Enum for different cross-validation modes."""
expand = "expand"
constant = "constant"
@classmethod
def _missing_(cls, value):
raise NotImplementedError(
f"{value} is not a valid {cls.__name__}. Only {', '.join([repr(m.value) for m in cls])} modes allowed"
)
[docs]class FoldMask(BaseMixin):
"""Container to hold the description of the fold mask.
Fold masks are expected to be used for backtest strategy customization.
"""
def __init__(
self,
first_train_timestamp: Optional[Timestamp],
last_train_timestamp: Timestamp,
target_timestamps: List[Timestamp],
):
"""Init FoldMask.
Parameters
----------
first_train_timestamp:
First train timestamp, the first timestamp in the dataset if None is passed
last_train_timestamp:
Last train timestamp
target_timestamps:
List of target timestamps
"""
self.first_train_timestamp = pd.to_datetime(first_train_timestamp) if first_train_timestamp else None
self.last_train_timestamp = pd.to_datetime(last_train_timestamp)
self.target_timestamps = sorted([pd.to_datetime(timestamp) for timestamp in target_timestamps])
self._validate_last_train_timestamp()
self._validate_target_timestamps()
def _validate_last_train_timestamp(self):
"""Check that last train timestamp is later then first train timestamp."""
if self.first_train_timestamp and self.last_train_timestamp < self.first_train_timestamp:
raise ValueError("Last train timestamp should be not sooner than first train timestamp!")
def _validate_target_timestamps(self):
"""Check that all target timestamps are later then last train timestamp."""
first_target_timestamp = self.target_timestamps[0]
if first_target_timestamp <= self.last_train_timestamp:
raise ValueError("Target timestamps should be strictly later then last train timestamp!")
[docs] def validate_on_dataset(self, ts: TSDataset, horizon: int):
"""Validate fold mask on the dataset with specified horizon.
Parameters
----------
ts:
Dataset to validate on
horizon:
Forecasting horizon
"""
dataset_timestamps = list(ts.index)
dataset_description = ts.describe()
min_first_timestamp = ts.index.min()
if self.first_train_timestamp and self.first_train_timestamp < min_first_timestamp:
raise ValueError(f"First train timestamp should be later than {min_first_timestamp}!")
last_timestamp = dataset_description["end_timestamp"].min()
if self.last_train_timestamp > last_timestamp:
raise ValueError(f"Last train timestamp should be not later than {last_timestamp}!")
dataset_first_target_timestamp = dataset_timestamps[dataset_timestamps.index(self.last_train_timestamp) + 1]
mask_first_target_timestamp = self.target_timestamps[0]
if mask_first_target_timestamp < dataset_first_target_timestamp:
raise ValueError(f"First target timestamp should be not sooner than {dataset_first_target_timestamp}!")
dataset_last_target_timestamp = dataset_timestamps[
dataset_timestamps.index(self.last_train_timestamp) + horizon
]
mask_last_target_timestamp = self.target_timestamps[-1]
if dataset_last_target_timestamp < mask_last_target_timestamp:
raise ValueError(f"Last target timestamp should be not later than {dataset_last_target_timestamp}!")
[docs]class AbstractPipeline(AbstractSaveable):
"""Interface for pipeline."""
[docs] @abstractmethod
def fit(self, ts: TSDataset) -> "AbstractPipeline":
"""Fit the Pipeline.
Parameters
----------
ts:
Dataset with timeseries data
Returns
-------
:
Fitted Pipeline instance
"""
pass
[docs] @abstractmethod
def forecast(
self,
ts: Optional[TSDataset] = None,
prediction_interval: bool = False,
quantiles: Sequence[float] = (0.025, 0.975),
n_folds: int = 3,
return_components: bool = False,
) -> TSDataset:
"""Make a forecast of the next points of a dataset.
The result of forecasting starts from the last point of ``ts``, not including it.
Parameters
----------
ts:
Dataset to forecast. If not given, dataset given during :py:meth:``fit`` is used.
prediction_interval:
If True returns prediction interval for forecast
quantiles:
Levels of prediction distribution. By default 2.5% and 97.5% taken to form a 95% prediction interval
n_folds:
Number of folds to use in the backtest for prediction interval estimation
return_components:
If True additionally returns forecast components
Returns
-------
:
Dataset with predictions
"""
pass
[docs] @abstractmethod
def predict(
self,
ts: TSDataset,
start_timestamp: Optional[pd.Timestamp] = None,
end_timestamp: Optional[pd.Timestamp] = None,
prediction_interval: bool = False,
quantiles: Sequence[float] = (0.025, 0.975),
return_components: bool = False,
) -> TSDataset:
"""Make in-sample predictions on dataset in a given range.
Currently, in situation when segments start with different timestamps
we only guarantee to work with ``start_timestamp`` >= beginning of all segments.
Parameters
----------
ts:
Dataset to make predictions on.
start_timestamp:
First timestamp of prediction range to return, should be >= than first timestamp in ``ts``;
expected that beginning of each segment <= ``start_timestamp``;
if isn't set the first timestamp where each segment began is taken.
end_timestamp:
Last timestamp of prediction range to return; if isn't set the last timestamp of ``ts`` is taken.
Expected that value is less or equal to the last timestamp in ``ts``.
prediction_interval:
If True returns prediction interval for forecast.
quantiles:
Levels of prediction distribution. By default 2.5% and 97.5% taken to form a 95% prediction interval.
return_components:
If True additionally returns forecast components
Returns
-------
:
Dataset with predictions in ``[start_timestamp, end_timestamp]`` range.
Raises
------
ValueError:
Value of ``end_timestamp`` is less than ``start_timestamp``.
ValueError:
Value of ``start_timestamp`` goes before point where each segment started.
ValueError:
Value of ``end_timestamp`` goes after the last timestamp.
"""
[docs] @abstractmethod
def backtest(
self,
ts: TSDataset,
metrics: List[Metric],
n_folds: Union[int, List[FoldMask]] = 5,
mode: Optional[str] = None,
aggregate_metrics: bool = False,
n_jobs: int = 1,
refit: Union[bool, int] = True,
stride: Optional[int] = None,
joblib_params: Optional[Dict[str, Any]] = None,
forecast_params: Optional[Dict[str, Any]] = None,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Run backtest with the pipeline.
If ``refit != True`` and some component of the pipeline doesn't support forecasting with gap, this component will raise an exception.
Parameters
----------
ts:
Dataset to fit models in backtest
metrics:
List of metrics to compute for each fold
n_folds:
Number of folds or the list of fold masks
mode:
Train generation policy: 'expand' or 'constant'. Works only if ``n_folds`` is integer.
By default, is set to 'expand'.
aggregate_metrics:
If True aggregate metrics above folds, return raw metrics otherwise
n_jobs:
Number of jobs to run in parallel
refit:
Determines how often pipeline should be retrained during iteration over folds.
* If ``True``: pipeline is retrained on each fold.
* If ``False``: pipeline is trained only on the first fold.
* If ``value: int``: pipeline is trained every ``value`` folds starting from the first.
stride:
Number of points between folds. Works only if ``n_folds`` is integer. By default, is set to ``horizon``.
joblib_params:
Additional parameters for :py:class:`joblib.Parallel`
forecast_params:
Additional parameters for :py:func:`~etna.pipeline.base.BasePipeline.forecast`
Returns
-------
metrics_df, forecast_df, fold_info_df: Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]
Metrics dataframe, forecast dataframe and dataframe with information about folds
"""
[docs] @abstractmethod
def params_to_tune(self) -> Dict[str, BaseDistribution]:
"""Get hyperparameter grid to tune.
Returns
-------
:
Grid with hyperparameters.
"""
[docs]class FoldParallelGroup(TypedDict):
"""Group for parallel fold processing."""
train_fold_number: int
train_mask: FoldMask
forecast_fold_numbers: List[int]
forecast_masks: List[FoldMask]
[docs]class _DummyMetric(Metric):
"""Dummy metric that is created only for implementation of BasePipeline._forecast_prediction_interval."""
def __init__(self, mode: str = MetricAggregationMode.per_segment, **kwargs):
super().__init__(mode=mode, metric_fn=self._compute_metric, **kwargs)
@staticmethod
def _compute_metric(y_true: np.ndarray, y_pred: np.ndarray) -> float:
return 0.0
@property
def greater_is_better(self) -> bool:
return False
def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[str, float]]:
segments = set(y_true.df.columns.get_level_values("segment"))
metrics_per_segment = {}
for segment in segments:
metrics_per_segment[segment] = 0.0
metrics = self._aggregate_metrics(metrics_per_segment)
return metrics
[docs]class BasePipeline(AbstractPipeline, BaseMixin):
"""Base class for pipeline."""
def __init__(self, horizon: int):
self._validate_horizon(horizon=horizon)
self.horizon = horizon
self.ts: Optional[TSDataset] = None
@staticmethod
def _validate_horizon(horizon: int):
"""Check that given number of folds is grater than 1."""
if horizon <= 0:
raise ValueError("At least one point in the future is expected.")
@staticmethod
def _validate_quantiles(quantiles: Sequence[float]) -> Sequence[float]:
"""Check that given number of folds is grater than 1."""
for quantile in quantiles:
if not (0 < quantile < 1):
raise ValueError("Quantile should be a number from (0,1).")
return quantiles
@abstractmethod
def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset:
"""Make predictions."""
pass
def _forecast_prediction_interval(
self, ts: TSDataset, predictions: TSDataset, quantiles: Sequence[float], n_folds: int
) -> TSDataset:
"""Add prediction intervals to the forecasts."""
with tslogger.disable():
_, forecasts, _ = self.backtest(ts=ts, metrics=[_DummyMetric()], n_folds=n_folds)
self._add_forecast_borders(ts=ts, backtest_forecasts=forecasts, quantiles=quantiles, predictions=predictions)
return predictions
@staticmethod
def _validate_residuals_for_interval_estimation(backtest_forecasts: TSDataset, residuals: pd.DataFrame):
len_backtest, num_segments = residuals.shape
min_timestamp = backtest_forecasts.index.min()
max_timestamp = backtest_forecasts.index.max()
non_nan_counts = np.sum(~np.isnan(residuals.values), axis=0)
if np.any(non_nan_counts < len_backtest):
warnings.warn(
f"There are NaNs in target on time span from {min_timestamp} to {max_timestamp}. "
f"It can obstruct prediction interval estimation on history data."
)
if np.any(non_nan_counts < 2):
raise ValueError(
f"There aren't enough target values to evaluate prediction intervals on history! "
f"For each segment there should be at least 2 points with defined value in a "
f"time span from {min_timestamp} to {max_timestamp}. "
f"You can try to increase n_folds parameter to make time span bigger."
)
def _add_forecast_borders(
self, ts: TSDataset, backtest_forecasts: pd.DataFrame, quantiles: Sequence[float], predictions: TSDataset
) -> None:
"""Estimate prediction intervals and add to the forecasts."""
backtest_forecasts = TSDataset(df=backtest_forecasts, freq=ts.freq)
residuals = (
backtest_forecasts.loc[:, pd.IndexSlice[:, "target"]]
- ts[backtest_forecasts.index.min() : backtest_forecasts.index.max(), :, "target"]
)
self._validate_residuals_for_interval_estimation(backtest_forecasts=backtest_forecasts, residuals=residuals)
sigma = np.nanstd(residuals.values, axis=0)
borders = []
for quantile in quantiles:
z_q = norm.ppf(q=quantile)
border = predictions[:, :, "target"] + sigma * z_q
border.rename({"target": f"target_{quantile:.4g}"}, inplace=True, axis=1)
borders.append(border)
predictions.df = pd.concat([predictions.df] + borders, axis=1).sort_index(axis=1, level=(0, 1))
[docs] def forecast(
self,
ts: Optional[TSDataset] = None,
prediction_interval: bool = False,
quantiles: Sequence[float] = (0.025, 0.975),
n_folds: int = 3,
return_components: bool = False,
) -> TSDataset:
"""Make a forecast of the next points of a dataset.
The result of forecasting starts from the last point of ``ts``, not including it.
Parameters
----------
ts:
Dataset to forecast. If not given, dataset given during :py:meth:``fit`` is used.
prediction_interval:
If True returns prediction interval for forecast
quantiles:
Levels of prediction distribution. By default 2.5% and 97.5% taken to form a 95% prediction interval
n_folds:
Number of folds to use in the backtest for prediction interval estimation
return_components:
If True additionally returns forecast components
Returns
-------
:
Dataset with predictions
Raises
------
NotImplementedError:
Adding target components is not currently implemented
"""
if ts is None:
if self.ts is None:
raise ValueError(
"There is no ts to forecast! Pass ts into forecast method or make sure that pipeline is loaded with ts."
)
ts = self.ts
self._validate_quantiles(quantiles=quantiles)
self._validate_backtest_n_folds(n_folds=n_folds)
predictions = self._forecast(ts=ts, return_components=return_components)
if prediction_interval:
predictions = self._forecast_prediction_interval(
ts=ts, predictions=predictions, quantiles=quantiles, n_folds=n_folds
)
return predictions
@staticmethod
def _make_predict_timestamps(
ts: TSDataset,
start_timestamp: Optional[pd.Timestamp] = None,
end_timestamp: Optional[pd.Timestamp] = None,
) -> Tuple[pd.Timestamp, pd.Timestamp]:
min_timestamp = ts.describe()["start_timestamp"].max()
max_timestamp = ts.index[-1]
if start_timestamp is None:
start_timestamp = min_timestamp
if end_timestamp is None:
end_timestamp = max_timestamp
if start_timestamp < min_timestamp:
raise ValueError("Value of start_timestamp is less than beginning of some segments!")
if end_timestamp > max_timestamp:
raise ValueError("Value of end_timestamp is more than ending of dataset!")
if start_timestamp > end_timestamp:
raise ValueError("Value of end_timestamp is less than start_timestamp!")
return start_timestamp, end_timestamp
@abstractmethod
def _predict(
self,
ts: TSDataset,
start_timestamp: Optional[pd.Timestamp],
end_timestamp: Optional[pd.Timestamp],
prediction_interval: bool,
quantiles: Sequence[float],
return_components: bool,
) -> TSDataset:
pass
[docs] def predict(
self,
ts: TSDataset,
start_timestamp: Optional[pd.Timestamp] = None,
end_timestamp: Optional[pd.Timestamp] = None,
prediction_interval: bool = False,
quantiles: Sequence[float] = (0.025, 0.975),
return_components: bool = False,
) -> TSDataset:
"""Make in-sample predictions on dataset in a given range.
Currently, in situation when segments start with different timestamps
we only guarantee to work with ``start_timestamp`` >= beginning of all segments.
Parameters
----------
ts:
Dataset to make predictions on.
start_timestamp:
First timestamp of prediction range to return, should be >= than first timestamp in ``ts``;
expected that beginning of each segment <= ``start_timestamp``;
if isn't set the first timestamp where each segment began is taken.
end_timestamp:
Last timestamp of prediction range to return; if isn't set the last timestamp of ``ts`` is taken.
Expected that value is less or equal to the last timestamp in ``ts``.
prediction_interval:
If True returns prediction interval for forecast.
quantiles:
Levels of prediction distribution. By default 2.5% and 97.5% taken to form a 95% prediction interval.
return_components:
If True additionally returns forecast components
Returns
-------
:
Dataset with predictions in ``[start_timestamp, end_timestamp]`` range.
Raises
------
ValueError:
Value of ``end_timestamp`` is less than ``start_timestamp``.
ValueError:
Value of ``start_timestamp`` goes before point where each segment started.
ValueError:
Value of ``end_timestamp`` goes after the last timestamp.
NotImplementedError:
Adding target components is not currently implemented
"""
start_timestamp, end_timestamp = self._make_predict_timestamps(
ts=ts, start_timestamp=start_timestamp, end_timestamp=end_timestamp
)
self._validate_quantiles(quantiles=quantiles)
result = self._predict(
ts=ts,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
prediction_interval=prediction_interval,
quantiles=quantiles,
return_components=return_components,
)
return result
def _init_backtest(self):
self._folds: Optional[Dict[int, Any]] = None
self._fold_column = "fold_number"
@staticmethod
def _validate_backtest_n_folds(n_folds: int):
"""Check that given n_folds value is >= 1."""
if n_folds < 1:
raise ValueError(f"Folds number should be a positive number, {n_folds} given")
@staticmethod
def _validate_backtest_mode(n_folds: Union[int, List[FoldMask]], mode: Optional[str]) -> CrossValidationMode:
if mode is None:
return CrossValidationMode.expand
if not isinstance(n_folds, int):
raise ValueError("Mode shouldn't be set if n_folds are fold masks!")
return CrossValidationMode(mode.lower())
@staticmethod
def _validate_backtest_stride(n_folds: Union[int, List[FoldMask]], horizon: int, stride: Optional[int]) -> int:
if stride is None:
return horizon
if not isinstance(n_folds, int):
raise ValueError("Stride shouldn't be set if n_folds are fold masks!")
if stride < 1:
raise ValueError(f"Stride should be a positive number, {stride} given!")
return stride
@staticmethod
def _validate_backtest_dataset(ts: TSDataset, n_folds: int, horizon: int, stride: int):
"""Check all segments have enough timestamps to validate forecaster with given number of splits."""
min_required_length = horizon + (n_folds - 1) * stride
segments = set(ts.df.columns.get_level_values("segment"))
for segment in segments:
segment_target = ts[:, segment, "target"]
if len(segment_target) < min_required_length:
raise ValueError(
f"All the series from feature dataframe should contain at least "
f"{horizon} + {n_folds-1} * {stride} = {min_required_length} timestamps; "
f"series {segment} does not."
)
@staticmethod
def _generate_masks_from_n_folds(
ts: TSDataset, n_folds: int, horizon: int, mode: CrossValidationMode, stride: int
) -> List[FoldMask]:
"""Generate fold masks from n_folds."""
if mode is CrossValidationMode.expand:
constant_history_length = 0
elif mode is CrossValidationMode.constant:
constant_history_length = 1
else:
assert_never(mode)
masks = []
dataset_timestamps = list(ts.index)
min_timestamp_idx, max_timestamp_idx = 0, len(dataset_timestamps)
for offset in range(n_folds, 0, -1):
min_train_idx = min_timestamp_idx + (n_folds - offset) * stride * constant_history_length
max_train_idx = max_timestamp_idx - stride * (offset - 1) - horizon - 1
min_test_idx = max_train_idx + 1
max_test_idx = max_train_idx + horizon
min_train, max_train = dataset_timestamps[min_train_idx], dataset_timestamps[max_train_idx]
min_test, max_test = dataset_timestamps[min_test_idx], dataset_timestamps[max_test_idx]
mask = FoldMask(
first_train_timestamp=min_train,
last_train_timestamp=max_train,
target_timestamps=list(pd.date_range(start=min_test, end=max_test, freq=ts.freq)),
)
masks.append(mask)
return masks
@staticmethod
def _validate_backtest_metrics(metrics: List[Metric]):
"""Check that given metrics are valid for backtest."""
if not metrics:
raise ValueError("At least one metric required")
for metric in metrics:
if not metric.mode == MetricAggregationMode.per_segment:
raise ValueError(
f"All the metrics should be in {MetricAggregationMode.per_segment}, "
f"{metric.name} metric is in {metric.mode} mode"
)
@staticmethod
def _generate_folds_datasets(
ts: TSDataset, masks: List[FoldMask], horizon: int
) -> Generator[Tuple[TSDataset, TSDataset], None, None]:
"""Generate folds."""
timestamps = list(ts.index)
for mask in masks:
min_train_idx = timestamps.index(mask.first_train_timestamp)
max_train_idx = timestamps.index(mask.last_train_timestamp)
min_test_idx = max_train_idx + 1
max_test_idx = max_train_idx + horizon
min_train, max_train = timestamps[min_train_idx], timestamps[max_train_idx]
min_test, max_test = timestamps[min_test_idx], timestamps[max_test_idx]
train, test = ts.train_test_split(
train_start=min_train, train_end=max_train, test_start=min_test, test_end=max_test
)
yield train, test
def _compute_metrics(
self, metrics: List[Metric], y_true: TSDataset, y_pred: TSDataset
) -> Dict[str, Dict[str, float]]:
"""Compute metrics for given y_true, y_pred."""
metrics_values: Dict[str, Dict[str, float]] = {}
for metric in metrics:
metrics_values[metric.name] = metric(y_true=y_true, y_pred=y_pred) # type: ignore
return metrics_values
def _fit_backtest_pipeline(
self,
ts: TSDataset,
fold_number: int,
) -> "BasePipeline":
"""Fit pipeline for a given data in backtest."""
tslogger.start_experiment(job_type="training", group=str(fold_number))
pipeline = deepcopy(self)
pipeline.fit(ts=ts)
tslogger.finish_experiment()
return pipeline
def _forecast_backtest_pipeline(
self, pipeline: "BasePipeline", ts: TSDataset, fold_number: int, forecast_params: Dict[str, Any]
) -> TSDataset:
"""Make a forecast with a given pipeline in backtest."""
tslogger.start_experiment(job_type="forecasting", group=str(fold_number))
forecast = pipeline.forecast(ts=ts, **forecast_params)
tslogger.finish_experiment()
return forecast
def _process_fold_forecast(
self,
forecast: TSDataset,
train: TSDataset,
test: TSDataset,
pipeline: "BasePipeline",
fold_number: int,
mask: FoldMask,
metrics: List[Metric],
) -> Dict[str, Any]:
"""Process forecast made for a fold."""
tslogger.start_experiment(job_type="crossval", group=str(fold_number))
fold: Dict[str, Any] = {}
for stage_name, stage_df in zip(("train", "test"), (train, test)):
fold[f"{stage_name}_timerange"] = {}
fold[f"{stage_name}_timerange"]["start"] = stage_df.index.min()
fold[f"{stage_name}_timerange"]["end"] = stage_df.index.max()
forecast.df = forecast.df.loc[mask.target_timestamps]
test.df = test.df.loc[mask.target_timestamps]
fold["forecast"] = forecast
fold["metrics"] = deepcopy(pipeline._compute_metrics(metrics=metrics, y_true=test, y_pred=forecast))
tslogger.log_backtest_run(pd.DataFrame(fold["metrics"]), forecast.to_pandas(), test.to_pandas())
tslogger.finish_experiment()
return fold
def _get_backtest_metrics(self, aggregate_metrics: bool = False) -> pd.DataFrame:
"""Get dataframe with metrics."""
if self._folds is None:
raise ValueError("Something went wrong during backtest initialization!")
metrics_dfs = []
for i, fold in self._folds.items():
fold_metrics = pd.DataFrame(fold["metrics"]).reset_index().rename({"index": "segment"}, axis=1)
fold_metrics[self._fold_column] = i
metrics_dfs.append(fold_metrics)
metrics_df = pd.concat(metrics_dfs)
metrics_df.sort_values(["segment", self._fold_column], inplace=True)
if aggregate_metrics:
metrics_df = metrics_df.groupby("segment").mean().reset_index().drop(self._fold_column, axis=1)
return metrics_df
def _get_fold_info(self) -> pd.DataFrame:
"""Get information about folds."""
if self._folds is None:
raise ValueError("Something went wrong during backtest initialization!")
timerange_dfs = []
for fold_number, fold_info in self._folds.items():
tmp_df = pd.DataFrame()
for stage_name in ("train", "test"):
for border in ("start", "end"):
tmp_df[f"{stage_name}_{border}_time"] = [fold_info[f"{stage_name}_timerange"][border]]
tmp_df[self._fold_column] = fold_number
timerange_dfs.append(tmp_df)
timerange_df = pd.concat(timerange_dfs, ignore_index=True)
return timerange_df
def _get_backtest_forecasts(self) -> pd.DataFrame:
"""Get forecasts from different folds."""
if self._folds is None:
raise ValueError("Something went wrong during backtest initialization!")
forecasts_list = []
for fold_number, fold_info in self._folds.items():
forecast_ts = fold_info["forecast"]
segments = forecast_ts.segments
forecast = forecast_ts.df
fold_number_df = pd.DataFrame(
np.tile(fold_number, (forecast.index.shape[0], len(segments))),
columns=pd.MultiIndex.from_product([segments, [self._fold_column]], names=("segment", "feature")),
index=forecast.index,
)
forecast = forecast.join(fold_number_df)
forecasts_list.append(forecast)
forecasts = pd.concat(forecasts_list)
forecasts.sort_index(axis=1, inplace=True)
return forecasts
def _prepare_fold_masks(
self, ts: TSDataset, masks: Union[int, List[FoldMask]], mode: CrossValidationMode, stride: int
) -> List[FoldMask]:
"""Prepare and validate fold masks."""
if isinstance(masks, int):
self._validate_backtest_n_folds(n_folds=masks)
self._validate_backtest_dataset(ts=ts, n_folds=masks, horizon=self.horizon, stride=stride)
masks = self._generate_masks_from_n_folds(
ts=ts, n_folds=masks, horizon=self.horizon, mode=mode, stride=stride
)
for i, mask in enumerate(masks):
mask.first_train_timestamp = mask.first_train_timestamp if mask.first_train_timestamp else ts.index[0]
masks[i] = mask
for mask in masks:
mask.validate_on_dataset(ts=ts, horizon=self.horizon)
return masks
@staticmethod
def _make_backtest_fold_groups(masks: List[FoldMask], refit: Union[bool, int]) -> List[FoldParallelGroup]:
"""Make groups of folds for backtest."""
if not refit:
refit = len(masks)
grouped_folds = []
num_groups = math.ceil(len(masks) / refit)
for group_id in range(num_groups):
train_fold_number = group_id * refit
forecast_fold_numbers = [train_fold_number + i for i in range(refit) if train_fold_number + i < len(masks)]
cur_group: FoldParallelGroup = {
"train_fold_number": train_fold_number,
"train_mask": masks[train_fold_number],
"forecast_fold_numbers": forecast_fold_numbers,
"forecast_masks": [masks[i] for i in forecast_fold_numbers],
}
grouped_folds.append(cur_group)
return grouped_folds
def _run_all_folds(
self,
masks: List[FoldMask],
ts: TSDataset,
metrics: List[Metric],
n_jobs: int,
refit: Union[bool, int],
joblib_params: Dict[str, Any],
forecast_params: Dict[str, Any],
) -> Dict[int, Any]:
"""Run pipeline on all folds."""
fold_groups = self._make_backtest_fold_groups(masks=masks, refit=refit)
with Parallel(n_jobs=n_jobs, **joblib_params) as parallel:
# fitting
fit_masks = [group["train_mask"] for group in fold_groups]
fit_datasets = (
train for train, _ in self._generate_folds_datasets(ts=ts, masks=fit_masks, horizon=self.horizon)
)
pipelines = parallel(
delayed(self._fit_backtest_pipeline)(ts=fit_ts, fold_number=fold_groups[group_idx]["train_fold_number"])
for group_idx, fit_ts in enumerate(fit_datasets)
)
# forecasting
forecast_masks = [group["forecast_masks"] for group in fold_groups]
forecast_datasets = (
(
train
for train, _ in self._generate_folds_datasets(
ts=ts, masks=group_forecast_masks, horizon=self.horizon
)
)
for group_forecast_masks in forecast_masks
)
forecasts_flat = parallel(
delayed(self._forecast_backtest_pipeline)(
ts=forecast_ts,
pipeline=pipelines[group_idx],
fold_number=fold_groups[group_idx]["forecast_fold_numbers"][idx],
forecast_params=forecast_params,
)
for group_idx, group_forecast_datasets in enumerate(forecast_datasets)
for idx, forecast_ts in enumerate(group_forecast_datasets)
)
# processing forecasts
fold_process_train_datasets = (
train for train, _ in self._generate_folds_datasets(ts=ts, masks=fit_masks, horizon=self.horizon)
)
fold_process_test_datasets = (
(
test
for _, test in self._generate_folds_datasets(
ts=ts, masks=group_forecast_masks, horizon=self.horizon
)
)
for group_forecast_masks in forecast_masks
)
fold_results_flat = parallel(
delayed(self._process_fold_forecast)(
forecast=forecasts_flat[group_idx * refit + idx],
train=train,
test=test,
pipeline=pipelines[group_idx],
fold_number=fold_groups[group_idx]["forecast_fold_numbers"][idx],
mask=fold_groups[group_idx]["forecast_masks"][idx],
metrics=metrics,
)
for group_idx, (train, group_fold_process_test_datasets) in enumerate(
zip(fold_process_train_datasets, fold_process_test_datasets)
)
for idx, test in enumerate(group_fold_process_test_datasets)
)
results = {
fold_number: fold_results_flat[group_idx * refit + idx]
for group_idx in range(len(fold_groups))
for idx, fold_number in enumerate(fold_groups[group_idx]["forecast_fold_numbers"])
}
return results
[docs] def backtest(
self,
ts: TSDataset,
metrics: List[Metric],
n_folds: Union[int, List[FoldMask]] = 5,
mode: Optional[str] = None,
aggregate_metrics: bool = False,
n_jobs: int = 1,
refit: Union[bool, int] = True,
stride: Optional[int] = None,
joblib_params: Optional[Dict[str, Any]] = None,
forecast_params: Optional[Dict[str, Any]] = None,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Run backtest with the pipeline.
If ``refit != True`` and some component of the pipeline doesn't support forecasting with gap, this component will raise an exception.
Parameters
----------
ts:
Dataset to fit models in backtest
metrics:
List of metrics to compute for each fold
n_folds:
Number of folds or the list of fold masks
mode:
Train generation policy: 'expand' or 'constant'. Works only if ``n_folds`` is integer.
By default, is set to 'expand'.
aggregate_metrics:
If True aggregate metrics above folds, return raw metrics otherwise
n_jobs:
Number of jobs to run in parallel
refit:
Determines how often pipeline should be retrained during iteration over folds.
* If ``True``: pipeline is retrained on each fold.
* If ``False``: pipeline is trained only on the first fold.
* If ``value: int``: pipeline is trained every ``value`` folds starting from the first.
stride:
Number of points between folds. Works only if ``n_folds`` is integer. By default, is set to ``horizon``.
joblib_params:
Additional parameters for :py:class:`joblib.Parallel`
forecast_params:
Additional parameters for :py:func:`~etna.pipeline.base.BasePipeline.forecast`
Returns
-------
metrics_df, forecast_df, fold_info_df: Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]
Metrics dataframe, forecast dataframe and dataframe with information about folds
Raises
------
ValueError:
If ``mode`` is set when ``n_folds`` are ``List[FoldMask]``.
ValueError:
If ``stride`` is set when ``n_folds`` are ``List[FoldMask]``.
"""
mode_enum = self._validate_backtest_mode(n_folds=n_folds, mode=mode)
stride = self._validate_backtest_stride(n_folds=n_folds, horizon=self.horizon, stride=stride)
if joblib_params is None:
joblib_params = dict(verbose=11, backend="multiprocessing", mmap_mode="c")
if forecast_params is None:
forecast_params = dict()
self._init_backtest()
self._validate_backtest_metrics(metrics=metrics)
masks = self._prepare_fold_masks(ts=ts, masks=n_folds, mode=mode_enum, stride=stride)
self._folds = self._run_all_folds(
masks=masks,
ts=ts,
metrics=metrics,
n_jobs=n_jobs,
refit=refit,
joblib_params=joblib_params,
forecast_params=forecast_params,
)
metrics_df = self._get_backtest_metrics(aggregate_metrics=aggregate_metrics)
forecast_df = self._get_backtest_forecasts()
fold_info_df = self._get_fold_info()
tslogger.start_experiment(job_type="crossval_results", group="all")
tslogger.log_backtest_metrics(ts, metrics_df, forecast_df, fold_info_df)
tslogger.finish_experiment()
return metrics_df, forecast_df, fold_info_df