Source code for etna.models.base

import functools
from abc import ABC
from abc import abstractmethod
from typing import Any
from typing import Dict
from typing import Iterable
from typing import Optional
from typing import Sequence
from typing import Sized
from typing import Tuple
from typing import Union

import numpy as np
import pandas as pd

from etna import SETTINGS
from etna.core import SaveMixin
from etna.core.mixins import BaseMixin
from etna.datasets.tsdataset import TSDataset
from etna.distributions import BaseDistribution
from etna.loggers import tslogger
from etna.models.decorators import log_decorator
from etna.models.mixins import SaveNNMixin

if SETTINGS.torch_required:
    import torch
    from pytorch_lightning import LightningModule
    from pytorch_lightning import Trainer
    from torch.utils.data import DataLoader
    from torch.utils.data import Dataset
    from torch.utils.data import random_split
else:
    from unittest.mock import Mock

    LightningModule = Mock  # type: ignore
    SaveNNMixin = Mock  # type: ignore


[docs]class AbstractModel(SaveMixin, ABC, BaseMixin): """Interface for model with fit method.""" @property @abstractmethod def context_size(self) -> int: """Context size of the model. Determines how many history points do we ask to pass to the model.""" pass
[docs] @abstractmethod def fit(self, ts: TSDataset) -> "AbstractModel": """Fit model. Parameters ---------- ts: Dataset with features Returns ------- : Model after fit """ pass
[docs] @abstractmethod def get_model(self) -> Union[Any, Dict[str, Any]]: """Get internal model/models that are used inside etna class. Internal model is a model that is used inside etna to forecast segments, e.g. :py:class:`catboost.CatBoostRegressor` or :py:class:`sklearn.linear_model.Ridge`. Returns ------- : The result can be of two types: * if model is multi-segment, then the result is internal model * if model is per-segment, then the result is dictionary where key is segment and value is internal model """ pass
[docs] def params_to_tune(self) -> Dict[str, BaseDistribution]: """Get grid for tuning hyperparameters. This is default implementation with empty grid. Returns ------- : Empty grid. """ return {}
[docs]class NonPredictionIntervalContextIgnorantAbstractModel(AbstractModel): """Interface for models that don't support prediction intervals and don't need context for prediction.""" @property def context_size(self) -> int: """Context size of the model. Determines how many history points do we ask to pass to the model. Zero for this model. """ return 0
[docs] @abstractmethod def forecast(self, ts: TSDataset, return_components: bool = False) -> TSDataset: """Make predictions. Parameters ---------- ts: Dataset with features return_components: If True additionally returns forecast components Returns ------- : Dataset with predictions """ pass
[docs] @abstractmethod def predict(self, ts: TSDataset, return_components: bool = False) -> TSDataset: """Make predictions with using true values as autoregression context if possible (teacher forcing). Parameters ---------- ts: Dataset with features return_components: If True additionally returns prediction components Returns ------- : Dataset with predictions """ pass
[docs]class NonPredictionIntervalContextRequiredAbstractModel(AbstractModel): """Interface for models that don't support prediction intervals and need context for prediction."""
[docs] @abstractmethod def forecast(self, ts: TSDataset, prediction_size: int, return_components: bool = False) -> TSDataset: """Make predictions. Parameters ---------- ts: Dataset with features prediction_size: Number of last timestamps to leave after making prediction. Previous timestamps will be used as a context for models that require it. return_components: If True additionally returns forecast components Returns ------- : Dataset with predictions """ pass
[docs] @abstractmethod def predict(self, ts: TSDataset, prediction_size: int, return_components: bool = False) -> TSDataset: """Make predictions with using true values as autoregression context if possible (teacher forcing). Parameters ---------- ts: Dataset with features prediction_size: Number of last timestamps to leave after making prediction. Previous timestamps will be used as a context for models that require it. return_components: If True additionally returns prediction components Returns ------- : Dataset with predictions """ pass
[docs]class PredictionIntervalContextIgnorantAbstractModel(AbstractModel): """Interface for models that support prediction intervals and don't need context for prediction.""" @property def context_size(self) -> int: """Context size of the model. Determines how many history points do we ask to pass to the model. Zero for this model. """ return 0
[docs] @abstractmethod def forecast( self, ts: TSDataset, prediction_interval: bool = False, quantiles: Sequence[float] = (0.025, 0.975), return_components: bool = False, ) -> TSDataset: """Make predictions. Parameters ---------- ts: Dataset with features prediction_interval: If True returns prediction interval for forecast quantiles: Levels of prediction distribution. By default 2.5% and 97.5% are taken to form a 95% prediction interval return_components: If True additionally returns forecast components Returns ------- : Dataset with predictions """ pass
[docs] @abstractmethod def predict( self, ts: TSDataset, prediction_interval: bool = False, quantiles: Sequence[float] = (0.025, 0.975), return_components: bool = False, ) -> TSDataset: """Make predictions with using true values as autoregression context if possible (teacher forcing). Parameters ---------- ts: Dataset with features prediction_interval: If True returns prediction interval for forecast quantiles: Levels of prediction distribution. By default 2.5% and 97.5% are taken to form a 95% prediction interval return_components: If True additionally returns prediction components Returns ------- : Dataset with predictions """ pass
[docs]class PredictionIntervalContextRequiredAbstractModel(AbstractModel): """Interface for models that support prediction intervals and need context for prediction."""
[docs] @abstractmethod def forecast( self, ts: TSDataset, prediction_size: int, prediction_interval: bool = False, quantiles: Sequence[float] = (0.025, 0.975), return_components: bool = False, ) -> TSDataset: """Make predictions. Parameters ---------- ts: Dataset with features prediction_size: Number of last timestamps to leave after making prediction. Previous timestamps will be used as a context for models that require it. prediction_interval: If True returns prediction interval for forecast quantiles: Levels of prediction distribution. By default 2.5% and 97.5% are taken to form a 95% prediction interval return_components: If True additionally returns forecast components Returns ------- : Dataset with predictions """ pass
[docs] @abstractmethod def predict( self, ts: TSDataset, prediction_size: int, prediction_interval: bool = False, quantiles: Sequence[float] = (0.025, 0.975), return_components: bool = False, ) -> TSDataset: """Make predictions with using true values as autoregression context if possible (teacher forcing). Parameters ---------- ts: Dataset with features prediction_size: Number of last timestamps to leave after making prediction. Previous timestamps will be used as a context for models that require it. prediction_interval: If True returns prediction interval for forecast quantiles: Levels of prediction distribution. By default 2.5% and 97.5% are taken to form a 95% prediction interval return_components: If True additionally returns prediction components Returns ------- : Dataset with predictions """ pass
[docs]class BaseAdapter(ABC): """Base class for models adapter."""
[docs] @abstractmethod def get_model(self) -> Any: """Get internal model that is used inside etna class. Internal model is a model that is used inside etna to forecast segments, e.g. :py:class:`catboost.CatBoostRegressor` or :py:class:`sklearn.linear_model.Ridge`. Returns ------- : Internal model """ pass
[docs]class DeepAbstractNet(ABC): """Interface for etna native deep models."""
[docs] @abstractmethod def make_samples(self, df: pd.DataFrame, encoder_length: int, decoder_length: int) -> Iterable[dict]: """Make samples from input slice of TSDataset. Parameters ---------- df: slice is per-segment Dataframes encoder_length: encoder_length decoder_length: decoder_length Returns ------- : samples of input slices """ pass
[docs] @abstractmethod def step(self, batch: dict, *args, **kwargs) -> Tuple["torch.Tensor", "torch.Tensor", "torch.Tensor"]: """Make batch step. Parameters ---------- batch: Batch with data to make inference on. Returns ------- : loss, true_target, prediction_target """ pass
[docs]class DeepBaseAbstractModel(ABC): """Interface for holding class of etna native deep models."""
[docs] @abstractmethod def raw_fit(self, torch_dataset: "Dataset") -> "DeepBaseAbstractModel": """Fit model with torch like Dataset. Parameters ---------- torch_dataset: Samples with data to fit on. Returns ------- : Trained Model """ pass
[docs] @abstractmethod def raw_predict(self, torch_dataset: "Dataset") -> Dict[Tuple[str, str], np.ndarray]: """Make inference on torch like Dataset. Parameters ---------- torch_dataset: Samples with data to make inference on. Returns ------- : Predictions for each segment """ pass
[docs] @abstractmethod def get_model(self) -> "DeepBaseNet": """Get model. Returns ------- : Torch Module """ pass
[docs]class DeepBaseNet(DeepAbstractNet, LightningModule): """Class for partially implemented LightningModule interface.""" def __init__(self): """Init DeepBaseNet.""" super().__init__()
[docs] def training_step(self, batch: dict, *args, **kwargs): # type: ignore """Training step. Parameters ---------- batch: batch of data Returns ------- : loss """ loss, _, _ = self.step(batch, *args, **kwargs) # type: ignore self.log("train_loss", loss, on_epoch=True) return loss
[docs] def validation_step(self, batch: dict, *args, **kwargs): # type: ignore """Validate step. Parameters ---------- batch: batch of data Returns ------- : loss """ loss, _, _ = self.step(batch, *args, **kwargs) # type: ignore self.log("val_loss", loss, on_epoch=True) return loss
[docs]class DeepBaseModel(DeepBaseAbstractModel, SaveNNMixin, NonPredictionIntervalContextRequiredAbstractModel): """Class for partially implemented interfaces for holding deep models.""" def __init__( self, *, net: DeepBaseNet, encoder_length: int, decoder_length: int, train_batch_size: int, test_batch_size: int, trainer_params: Optional[dict], train_dataloader_params: Optional[dict], test_dataloader_params: Optional[dict], val_dataloader_params: Optional[dict], split_params: Optional[dict], ): """Init DeepBaseModel. Parameters ---------- net: network to train encoder_length: encoder length decoder_length: decoder length train_batch_size: batch size for training test_batch_size: batch size for testing trainer_params: Pytorch ligthning trainer parameters (api reference :py:class:`pytorch_lightning.trainer.trainer.Trainer`) train_dataloader_params: parameters for train dataloader like sampler for example (api reference :py:class:`torch.utils.data.DataLoader`) test_dataloader_params: parameters for test dataloader val_dataloader_params: parameters for validation dataloader split_params: dictionary with parameters for :py:func:`torch.utils.data.random_split` for train-test splitting * **train_size**: (*float*) value from 0 to 1 - fraction of samples to use for training * **generator**: (*Optional[torch.Generator]*) - generator for reproducibile train-test splitting * **torch_dataset_size**: (*Optional[int]*) - number of samples in dataset, in case of dataset not implementing ``__len__`` """ super().__init__() self.net = net self.encoder_length = encoder_length self.decoder_length = decoder_length self.train_batch_size = train_batch_size self.test_batch_size = test_batch_size self.train_dataloader_params = {} if train_dataloader_params is None else train_dataloader_params self.test_dataloader_params = {} if test_dataloader_params is None else test_dataloader_params self.val_dataloader_params = {} if val_dataloader_params is None else val_dataloader_params self.trainer_params = {} if trainer_params is None else trainer_params self.split_params = {} if split_params is None else split_params self.trainer: Optional[Trainer] = None @property def context_size(self) -> int: """Context size of the model.""" return self.encoder_length
[docs] @log_decorator def fit(self, ts: TSDataset) -> "DeepBaseModel": """Fit model. Parameters ---------- ts: TSDataset with features Returns ------- : Model after fit """ torch_dataset = ts.to_torch_dataset( functools.partial( self.net.make_samples, encoder_length=self.encoder_length, decoder_length=self.decoder_length ), dropna=True, ) self.raw_fit(torch_dataset) return self
[docs] def raw_fit(self, torch_dataset: "Dataset") -> "DeepBaseModel": """Fit model on torch like Dataset. Parameters ---------- torch_dataset: Torch like dataset for model fit Returns ------- : Model after fit """ if self.split_params: if isinstance(torch_dataset, Sized): torch_dataset_size = len(torch_dataset) else: torch_dataset_size = self.split_params["torch_dataset_size"] if torch_dataset_size is None: raise ValueError("torch_dataset_size must be provided if torch_dataset is not sized") train_size = self.split_params["train_size"] torch_dataset_train_size = int(torch_dataset_size * train_size) torch_dataset_val_size = torch_dataset_size - torch_dataset_train_size train_dataset, val_dataset = random_split( torch_dataset, lengths=[ torch_dataset_train_size, torch_dataset_val_size, ], generator=self.split_params.get("generator"), ) train_dataloader = DataLoader( train_dataset, batch_size=self.train_batch_size, shuffle=True, **self.train_dataloader_params ) val_dataloader: Optional[DataLoader] = DataLoader( val_dataset, batch_size=self.test_batch_size, shuffle=False, **self.val_dataloader_params ) else: train_dataloader = DataLoader( torch_dataset, batch_size=self.train_batch_size, shuffle=True, **self.train_dataloader_params ) val_dataloader = None if "logger" not in self.trainer_params: self.trainer_params["logger"] = tslogger.pl_loggers else: self.trainer_params["logger"] += tslogger.pl_loggers self.trainer = Trainer(**self.trainer_params) self.trainer.fit(self.net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader) return self
[docs] def raw_predict(self, torch_dataset: "Dataset") -> Dict[Tuple[str, str], np.ndarray]: """Make inference on torch like Dataset. Parameters ---------- torch_dataset: Torch like dataset for model inference Returns ------- : Dictionary with predictions """ test_dataloader = DataLoader( torch_dataset, batch_size=self.test_batch_size, shuffle=False, **self.test_dataloader_params ) predictions_dict = dict() self.net.eval() with torch.no_grad(): for batch in test_dataloader: segments = batch["segment"] predictions = self.net(batch) predictions_array = predictions.numpy() for idx, segment in enumerate(segments): predictions_dict[(segment, "target")] = predictions_array[ idx, : ] # TODO: rethink in case of issue-791 return predictions_dict
[docs] @log_decorator def forecast(self, ts: "TSDataset", prediction_size: int, return_components: bool = False) -> "TSDataset": """Make predictions. This method will make autoregressive predictions. Parameters ---------- ts: Dataset with features and expected decoder length for context prediction_size: Number of last timestamps to leave after making prediction. Previous timestamps will be used as a context. return_components: If True additionally returns forecast components Returns ------- : Dataset with predictions """ if return_components: raise NotImplementedError("This mode isn't currently implemented!") expected_length = prediction_size + self.encoder_length if len(ts.index) < expected_length: raise ValueError( "Given context isn't big enough, try to decrease context_size, prediction_size or increase length of given dataset!" ) test_dataset = ts.to_torch_dataset( make_samples=functools.partial( self.net.make_samples, encoder_length=self.encoder_length, decoder_length=prediction_size ), dropna=False, ) predictions = self.raw_predict(test_dataset) end_idx = len(ts.index) future_ts = ts.tsdataset_idx_slice(start_idx=end_idx - prediction_size, end_idx=end_idx) for (segment, feature_nm), value in predictions.items(): # we don't want to change dtype after assignment, but there can happen cast to float32 future_ts.df.loc[:, pd.IndexSlice[segment, feature_nm]] = value[:prediction_size, :].astype(np.float64) return future_ts
[docs] @log_decorator def predict( self, ts: "TSDataset", prediction_size: int, return_components: bool = False, ) -> "TSDataset": """Make predictions. This method will make predictions using true values instead of predicted on a previous step. It can be useful for making in-sample forecasts. Parameters ---------- ts: Dataset with features and expected decoder length for context prediction_size: Number of last timestamps to leave after making prediction. Previous timestamps will be used as a context. return_components: If True additionally returns prediction components Returns ------- : Dataset with predictions """ raise NotImplementedError("Method predict isn't currently implemented!")
[docs] def get_model(self) -> "DeepBaseNet": """Get model. Returns ------- : Torch Module """ return self.net
ModelType = Union[ NonPredictionIntervalContextIgnorantAbstractModel, NonPredictionIntervalContextRequiredAbstractModel, PredictionIntervalContextIgnorantAbstractModel, PredictionIntervalContextRequiredAbstractModel, ] ContextRequiredModelType = Union[ NonPredictionIntervalContextRequiredAbstractModel, PredictionIntervalContextRequiredAbstractModel, ] ContextIgnorantModelType = Union[ NonPredictionIntervalContextIgnorantAbstractModel, PredictionIntervalContextIgnorantAbstractModel, ] PredictionIntervalModelType = Union[ PredictionIntervalContextIgnorantAbstractModel, PredictionIntervalContextRequiredAbstractModel ] NonPredictionIntervalModelType = Union[ NonPredictionIntervalContextIgnorantAbstractModel, NonPredictionIntervalContextRequiredAbstractModel ]