Source code for etna.pipeline.autoregressive_pipeline

import warnings
from typing import Sequence
from typing import cast

import pandas as pd
from typing_extensions import get_args

from etna.datasets import TSDataset
from etna.models.base import ContextIgnorantModelType
from etna.models.base import ContextRequiredModelType
from etna.models.base import ModelType
from etna.pipeline.base import BasePipeline
from etna.pipeline.mixins import ModelPipelineParamsToTuneMixin
from etna.pipeline.mixins import ModelPipelinePredictMixin
from etna.pipeline.mixins import SaveModelPipelineMixin
from etna.transforms import Transform


[docs]class AutoRegressivePipeline( ModelPipelinePredictMixin, ModelPipelineParamsToTuneMixin, SaveModelPipelineMixin, BasePipeline ): """Pipeline that make regressive models autoregressive. Examples -------- >>> from etna.datasets import generate_periodic_df >>> from etna.datasets import TSDataset >>> from etna.models import LinearPerSegmentModel >>> from etna.transforms import LagTransform >>> classic_df = generate_periodic_df( ... periods=100, ... start_time="2020-01-01", ... n_segments=4, ... period=7, ... sigma=3 ... ) >>> df = TSDataset.to_dataset(df=classic_df) >>> ts = TSDataset(df, freq="D") >>> horizon = 7 >>> transforms = [ ... LagTransform(in_column="target", lags=list(range(1, horizon+1))) ... ] >>> model = LinearPerSegmentModel() >>> pipeline = AutoRegressivePipeline(model, horizon, transforms, step=1) >>> _ = pipeline.fit(ts=ts) >>> forecast = pipeline.forecast() >>> pd.options.display.float_format = '{:,.2f}'.format >>> forecast[:, :, "target"] segment segment_0 segment_1 segment_2 segment_3 feature target target target target timestamp 2020-04-10 9.00 9.00 4.00 6.00 2020-04-11 5.00 2.00 7.00 9.00 2020-04-12 0.00 4.00 7.00 9.00 2020-04-13 0.00 5.00 9.00 7.00 2020-04-14 1.00 2.00 1.00 6.00 2020-04-15 5.00 7.00 4.00 7.00 2020-04-16 8.00 6.00 2.00 0.00 """ def __init__(self, model: ModelType, horizon: int, transforms: Sequence[Transform] = (), step: int = 1): """ Create instance of AutoRegressivePipeline with given parameters. Parameters ---------- model: Instance of the etna Model horizon: Number of timestamps in the future for forecasting transforms: Sequence of the transforms step: Size of prediction for one step of forecasting """ self.model = model self.transforms = transforms self.step = step super().__init__(horizon=horizon)
[docs] def fit(self, ts: TSDataset) -> "AutoRegressivePipeline": """Fit the AutoRegressivePipeline. Fit and apply given transforms to the data, then fit the model on the transformed data. Parameters ---------- ts: Dataset with timeseries data Returns ------- : Fitted Pipeline instance """ self.ts = ts ts.fit_transform(self.transforms) self.model.fit(ts) self.ts.inverse_transform(self.transforms) return self
def _create_predictions_template(self, ts: TSDataset) -> pd.DataFrame: """Create dataframe to fill with forecasts.""" prediction_df = ts[:, :, "target"] future_dates = pd.date_range( start=prediction_df.index.max(), periods=self.horizon + 1, freq=ts.freq, closed="right" ) prediction_df = prediction_df.reindex(prediction_df.index.append(future_dates)) prediction_df.index.name = "timestamp" return prediction_df def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset: """Make predictions.""" prediction_df = self._create_predictions_template(ts) target_components_dfs = [] for idx_start in range(0, self.horizon, self.step): current_step = min(self.step, self.horizon - idx_start) current_idx_border = ts.index.shape[0] + idx_start current_ts = TSDataset( df=prediction_df.iloc[:current_idx_border], freq=ts.freq, df_exog=ts.df_exog, known_future=ts.known_future, ) with warnings.catch_warnings(): warnings.filterwarnings( message="TSDataset freq can't be inferred", action="ignore", ) warnings.filterwarnings( message="You probably set wrong freq.", action="ignore", ) if isinstance(self.model, get_args(ContextRequiredModelType)): self.model = cast(ContextRequiredModelType, self.model) current_ts_forecast = current_ts.make_future( future_steps=current_step, tail_steps=self.model.context_size, transforms=self.transforms ) current_ts_future = self.model.forecast( ts=current_ts_forecast, prediction_size=current_step, return_components=return_components ) else: self.model = cast(ContextIgnorantModelType, self.model) current_ts_forecast = current_ts.make_future(future_steps=current_step, transforms=self.transforms) current_ts_future = self.model.forecast(ts=current_ts_forecast, return_components=return_components) current_ts_future.inverse_transform(self.transforms) if return_components: target_components_dfs.append(current_ts_future.get_target_components()) current_ts_future.drop_target_components() prediction_df = prediction_df.combine_first(current_ts_future.to_pandas()[prediction_df.columns]) # construct dataset and add all features prediction_ts = TSDataset(df=prediction_df, freq=ts.freq, df_exog=ts.df_exog, known_future=ts.known_future) prediction_ts.transform(self.transforms) prediction_ts.inverse_transform(self.transforms) # cut only last timestamps from result dataset prediction_ts.df = prediction_ts.df.tail(self.horizon) prediction_ts.raw_df = prediction_ts.raw_df.tail(self.horizon) if return_components: target_components_df = pd.concat(target_components_dfs) prediction_ts.add_target_components(target_components_df=target_components_df) return prediction_ts def _predict( self, ts: TSDataset, start_timestamp: pd.Timestamp, end_timestamp: pd.Timestamp, prediction_interval: bool, quantiles: Sequence[float], return_components: bool = False, ) -> TSDataset: return super()._predict( ts=ts, start_timestamp=start_timestamp, end_timestamp=end_timestamp, prediction_interval=prediction_interval, quantiles=quantiles, return_components=return_components, )