Source code for fedot.core.operations.evaluation.operation_implementations.data_operations.ts_transformations
from copy import copy, deepcopy
from random import random
from typing import Optional, Union
import numpy as np
import pandas as pd
from fedot.utilities.window_size_selector import WindowSizeSelector, WindowSizeSelectorMethodsEnum
from golem.core.log import default_log
from scipy.ndimage import gaussian_filter
from sklearn.decomposition import TruncatedSVD
from fedot.core.data.data import InputData, OutputData
from fedot.core.operations.evaluation.operation_implementations.implementation_interfaces import (
DataOperationImplementation
)
from fedot.core.operations.operation_parameters import OperationParameters
from fedot.core.repository.dataset_types import DataTypesEnum
from fedot.preprocessing.data_types import TYPE_TO_ID
[docs]class LaggedImplementation(DataOperationImplementation):
def __init__(self, params: Optional[OperationParameters]):
super().__init__(params)
self.window_size_minimum = None
self.sparse_transform = False
self.use_svd = False
self.features_columns = None
# Define logger object
self.log = default_log(self)
@property
def window_size(self) -> Optional[int]:
window_size = self.params.get('window_size')
if window_size:
window_size = round(window_size)
return window_size
@property
def n_components(self) -> Optional[int]:
return self.params.get('n_components')
[docs] def fit(self, input_data):
""" Class doesn't support fit operation
Args:
input_data: data with features, target and ids to process
"""
pass
[docs] def transform(self, input_data: InputData) -> OutputData:
""" Method for transformation of time series to lagged form for predict stage
Args:
input_data: data with features, target and ids to process
Returns:
output data with transformed features table
"""
new_input_data = copy(input_data)
forecast_length = new_input_data.task.task_params.forecast_length
# Correct window size parameter
self._check_and_correct_window_size(new_input_data.features, forecast_length)
self._apply_transformation_for_predict(new_input_data)
output_data = self._convert_to_output(new_input_data,
self.features_columns,
data_type=DataTypesEnum.table)
self._update_column_types(output_data)
return output_data
[docs] def transform_for_fit(self, input_data: InputData) -> OutputData:
"""Method for transformation of time series to lagged form for fit stage
Args:
input_data: data with features, target and ids to process
Returns:
output data with transformed features table
"""
new_input_data = copy(input_data)
forecast_length = new_input_data.task.task_params.forecast_length
# Correct window size parameter
self._check_and_correct_window_size(new_input_data.features, forecast_length)
target = np.array(new_input_data.target)
features = np.array(new_input_data.features)
old_idx = new_input_data.idx
new_target, new_idx = self._apply_transformation_for_fit(new_input_data, features,
target, forecast_length, old_idx)
# Update target for Input Data
new_input_data.target = new_target
new_input_data.idx = new_idx
output_data = self._convert_to_output(new_input_data,
self.features_columns,
data_type=DataTypesEnum.table)
self._update_column_types(output_data)
return output_data
[docs] def _check_and_correct_window_size(self, time_series: np.ndarray, forecast_length: int):
""" Method check if the length of the time series is not enough for
lagged transformation
Args:
time_series: time series for transformation
forecast_length: forecast length
Returns:
"""
max_allowed_window_size = max(1, len(time_series) - forecast_length - 1)
if self.window_size == 0:
selector = WindowSizeSelector(method=WindowSizeSelectorMethodsEnum.HAC, window_range=(5, 60))
new = int(selector.apply(time_series) * time_series.shape[0] * 0.01)
new = min(max_allowed_window_size, new)
self.log.message((f"Window size of lagged transformation was changed "
f"by WindowSizeSelector from {self.params.get('window_size')} to {new}"))
self.params.update(window_size=new)
# Maximum threshold
if self.window_size > max_allowed_window_size:
new = int(random() * max_allowed_window_size)
new = min(new, max_allowed_window_size)
new = max(new, self.window_size_minimum)
self.log.info(("Window size of lagged transformation was changed "
f"from {self.params.get('window_size')} to {new}"))
self.params.update(window_size=new)
# Minimum threshold
if self.window_size < self.window_size_minimum:
self.log.info((f"Warning: window size of lagged transformation was changed "
f"from {self.params.get('window_size')} to {self.window_size_minimum}"))
self.params.update(window_size=self.window_size_minimum)
[docs] def _update_column_types(self, output_data: OutputData):
"""Update column types after lagged transformation. All features becomes ``float``
"""
_, features_n_cols = output_data.predict.shape
feature_type_ids = np.array([TYPE_TO_ID[float]] * features_n_cols)
col_type_ids = {'features': feature_type_ids}
if output_data.target is not None and len(output_data.target.shape) > 1:
_, target_n_cols = output_data.target.shape
target_type_ids = np.array([TYPE_TO_ID[float]] * target_n_cols)
col_type_ids['target'] = target_type_ids
output_data.supplementary_data.col_type_ids = col_type_ids
[docs] def _apply_transformation_for_fit(self, input_data: InputData, features: np.array, target: np.array,
forecast_length: int, old_idx: np.array):
"""Apply lagged transformation on each time series in the current dataset
"""
# Shape of the time series
if len(features.shape) > 1:
# Multivariate time series
n_elements, n_time_series = features.shape
else:
n_time_series = 1
all_transformed_features = None
for current_ts_id in range(n_time_series):
# For each time series in the array
if n_time_series == 1:
current_ts = features
else:
current_ts = np.ravel(features[:, current_ts_id])
# Prepare features for training
new_idx, transformed_cols = ts_to_table(idx=old_idx,
time_series=current_ts,
window_size=self.window_size,
is_lag=True)
# Sparsing matrix of lagged features
if self.sparse_transform:
transformed_cols = _sparse_matrix(self.log,
transformed_cols,
self.n_components,
self.use_svd)
# Transform target
current_target = self._current_target_for_each_ts(current_ts_id, target)
new_idx, transformed_cols, new_target = prepare_target(all_idx=input_data.idx,
idx=new_idx,
features_columns=transformed_cols,
target=current_target,
forecast_length=forecast_length)
if current_ts_id == 0:
# Init full lagged table
all_transformed_features = transformed_cols
all_transformed_target = new_target
all_transformed_idx = np.array(new_idx)
else:
all_transformed_features, all_transformed_target, all_transformed_idx = self.stack_by_type_fit(
input_data, all_transformed_features,
all_transformed_target, all_transformed_idx,
transformed_cols, new_target, new_idx)
input_data.features = all_transformed_features
self.features_columns = all_transformed_features
return all_transformed_target, all_transformed_idx
[docs] def stack_by_type_fit(self, input_data, all_features, all_target, all_idx, features, target, idx):
"""Apply stack function for multi_ts and multivariable ts types on fit step
"""
functions_by_type = {
DataTypesEnum.multi_ts: self._stack_multi_ts,
DataTypesEnum.ts: self._stack_multi_variable
}
stack_function = functions_by_type.get(input_data.data_type)
return stack_function(all_features, all_target, all_idx, features, target, idx)
[docs] def _stack_multi_variable(self, all_features: np.array,
all_target: np.array,
all_idx: np.array,
features: np.array,
target: np.array,
idx: Union[list, np.array]):
"""Horizontally stack tables as multiple variables extends features for training
Args:
all_features: ``array`` with all features for adding new
all_target: ``array`` with all target (does not change)
all_idx: ``array`` with all indices (does not change)
features: ``array`` with new features for adding
target: ``array`` with new target for adding
idx: ``array`` with new idx for adding
Returns:
table
"""
all_features = np.hstack((all_features, features))
return all_features, all_target, all_idx
[docs] def _stack_multi_ts(self, all_features: np.array,
all_target: np.array,
all_idx: np.array,
features: np.array,
target: np.array,
idx: Union[list, np.array]):
"""Vertically stack tables as multi_ts data extends training set as combination of train and target
Args:
all_features: ``array`` with all features for adding new
all_target: ``array`` with all target
all_idx: ``array`` with all indices
features: ``array`` with new features for adding
target: ``array`` with new target for adding
idx: ``array`` with new idx for adding
Returns:
table
"""
all_features = np.vstack((all_features, features))
all_target = np.vstack((all_target, target))
all_idx = np.hstack((all_idx, np.array(idx)))
return all_features, all_target, all_idx
[docs] def _current_target_for_each_ts(self, current_ts_id, target):
"""Returns target for each time-series
"""
if len(target.shape) > 1:
# if multi_ts case
if current_ts_id >= target.shape[1]:
while current_ts_id >= target.shape[1]:
current_ts_id = current_ts_id - target.shape[1]
return target[:, current_ts_id]
else:
# if multivariable case
return target
[docs] def _apply_transformation_for_predict(self, input_data: InputData):
"""Apply lagged transformation for every column (time series) in the dataset
"""
old_idx = copy(input_data.idx)
if len(input_data.features.shape) > 1:
# Multivariate time series
n_elements, n_time_series = input_data.features.shape
else:
n_time_series = 1
all_transformed_features = None
for current_ts_id in range(n_time_series):
# For each time series
if n_time_series == 1:
current_ts = input_data.features
else:
current_ts = np.ravel(input_data.features[:, current_ts_id])
if self.sparse_transform:
self.log.debug('Sparse lagged transformation applied. If new data were used. Call fit method')
transformed_cols = self._update_features_for_sparse(current_ts, old_idx)
# Take last row in the lagged table and reshape into array with 1 row and n columns
current_ts = transformed_cols[-1].reshape(1, -1)
# Take last window_size elements for current ts
last_part_of_ts = current_ts[-self.window_size:].reshape(1, -1)
if current_ts_id == 0:
all_transformed_features = last_part_of_ts
else:
all_transformed_features = self.stack_by_type_predict(input_data,
all_transformed_features,
last_part_of_ts)
if input_data.data_type == DataTypesEnum.multi_ts:
all_transformed_features = np.expand_dims(all_transformed_features[0], axis=0)
self.features_columns = all_transformed_features
return all_transformed_features
[docs] def stack_by_type_predict(self, input_data, all_features, part_to_add):
"""Apply stack function for multi_ts and multivariable ts types on predict step
"""
if input_data.data_type == DataTypesEnum.multi_ts:
# for mutli_ts
all_features = np.vstack((all_features, part_to_add))
if input_data.data_type == DataTypesEnum.ts:
# for multivariable
all_features = np.hstack((all_features, part_to_add))
return all_features
[docs] def _update_features_for_sparse(self, time_series: np.array, idx: np.array):
"""Make sparse matrix which will be used during forecasting
"""
# Prepare features for training
new_idx, transformed_cols = ts_to_table(idx=idx,
time_series=time_series,
window_size=self.window_size,
is_lag=True)
# Sparsing
transformed_cols = _sparse_matrix(self.log,
transformed_cols,
self.n_components,
self.use_svd)
return transformed_cols
[docs]class SparseLaggedTransformationImplementation(LaggedImplementation):
"""Implementation of sparse lagged transformation for time series forecasting
"""
def __init__(self, params: Optional[OperationParameters]):
super().__init__(params)
self.sparse_transform = True
self.window_size_minimum = 4
[docs]class LaggedTransformationImplementation(LaggedImplementation):
"""Implementation of lagged transformation for time series forecasting
"""
def __init__(self, params: Optional[OperationParameters]):
super().__init__(params)
self.window_size_minimum = 1
[docs]class TsSmoothingImplementation(DataOperationImplementation):
def __init__(self, params: Optional[OperationParameters]):
super().__init__(params)
@property
def window_size(self) -> int:
return round(self.params.setdefault('window_size', 10))
[docs] def fit(self, input_data: InputData):
"""Class doesn't support fit operation
Args:
input_data: data with features, target and ids to process
"""
pass
[docs] def transform(self, input_data: InputData) -> OutputData:
"""Method for smoothing time series
Args:
input_data: data with features, target and ids to process
Returns:
output data with smoothed time series
"""
source_ts = input_data.features
if input_data.data_type == DataTypesEnum.multi_ts:
full_smoothed_ts = []
for ts_n in range(source_ts.shape[1]):
ts = pd.Series(source_ts[:, ts_n])
smoothed_ts = self._apply_smoothing_to_series(ts)
full_smoothed_ts.append(smoothed_ts)
output_data = self._convert_to_output(input_data,
np.array(full_smoothed_ts).T,
data_type=input_data.data_type)
else:
source_ts = pd.Series(input_data.features)
smoothed_ts = np.ravel(self._apply_smoothing_to_series(source_ts))
output_data = self._convert_to_output(input_data,
smoothed_ts,
data_type=input_data.data_type)
return output_data
def _apply_smoothing_to_series(self, ts):
smoothed_ts = ts.rolling(window=self.window_size).mean()
smoothed_ts = np.array(smoothed_ts)
# Filling first nans with source values
smoothed_ts[:self.window_size] = ts[:self.window_size]
return smoothed_ts
[docs]class ExogDataTransformationImplementation(DataOperationImplementation):
def __init__(self, params: Optional[OperationParameters]):
super().__init__(params)
[docs] def fit(self, input_data: InputData):
""" Class doesn't support fit operation
Args:
input_data: data with features, target and ids to process
"""
pass
[docs] def transform(self, input_data: InputData) -> OutputData:
""" Method for representing time series as column
Args:
input_data: data with features, target and ids to process
Returns:
output data with features as columns
"""
copied_data = copy(input_data)
parameters = copied_data.task.task_params
forecast_length = parameters.forecast_length
features_columns = np.array(copied_data.features)[-forecast_length:]
copied_data.idx = copied_data.idx[-forecast_length:]
features_columns = features_columns.reshape(1, -1)
output_data = self._convert_to_output(copied_data,
features_columns,
data_type=DataTypesEnum.table)
return output_data
[docs] def transform_for_fit(self, input_data: InputData) -> OutputData:
"""Method for representing time series as column for fit stage
Args:
input_data: data with features, target and ids to process
Returns:
output data with features as columns
"""
copied_data = copy(input_data)
parameters = copied_data.task.task_params
old_idx = copied_data.idx
forecast_length = parameters.forecast_length
# Transform features in "target-like way"
_, _, features_columns = prepare_target(all_idx=input_data.idx,
idx=old_idx,
features_columns=copied_data.features,
target=copied_data.features,
forecast_length=forecast_length)
# Transform target
new_idx, _, new_target = prepare_target(all_idx=input_data.idx,
idx=old_idx,
features_columns=copied_data.features,
target=copied_data.target,
forecast_length=forecast_length)
copied_data.target = new_target
copied_data.idx = new_idx
output_data = self._convert_to_output(copied_data,
features_columns,
data_type=DataTypesEnum.table)
return output_data
[docs]class GaussianFilterImplementation(DataOperationImplementation):
def __init__(self, params: Optional[OperationParameters]):
super().__init__(params)
[docs] def fit(self, input_data: InputData):
""" Class doesn't support fit operation
Args:
input_data: data with features, target and ids to process
"""
pass
[docs] def transform(self, input_data: InputData) -> OutputData:
""" Method for smoothing time series for predict stage
Args:
input_data: data with features, target and ids to process
Returns:
output data with smoothed time series
"""
source_ts = np.array(input_data.features)
# Apply smoothing operation
sigma = round(self.params.setdefault('sigma', 1))
smoothed_ts = gaussian_filter(source_ts, sigma=sigma)
smoothed_ts = np.array(smoothed_ts)
if input_data.data_type != DataTypesEnum.multi_ts:
smoothed_ts = np.ravel(smoothed_ts)
output_data = self._convert_to_output(input_data,
smoothed_ts,
data_type=input_data.data_type)
return output_data
[docs]class NumericalDerivativeFilterImplementation(DataOperationImplementation):
def __init__(self, params: OperationParameters):
super().__init__(params)
self.max_poly_degree = 5
self.default_poly_degree = 2
self.default_order = 1
self.log = default_log(self)
self._correct_params()
@property
def poly_degree(self) -> int:
return int(self.params.get('poly_degree'))
@property
def order(self) -> int:
return int(self.params.get('order'))
@property
def window_size(self) -> int:
return int(self.params.get('window_size'))
[docs] def fit(self, input_data: InputData):
""" Class doesn't support fit operation
Args:
input_data: data with features, target and ids to process
"""
pass
[docs] def transform(self, input_data: InputData) -> OutputData:
""" Method for finding numerical derivative of time series for predict stage
Args:
input_data: data with features, target and ids to process
Returns:
output data with smoothed time series
"""
source_ts = np.array(input_data.features)
# Apply differential operation
if input_data.data_type == DataTypesEnum.multi_ts:
full_differential_ts = []
for ts_n in range(source_ts.shape[1]):
ts = source_ts[:, ts_n]
differential_ts = self._differential_filter(ts)
full_differential_ts.append(differential_ts)
output_data = self._convert_to_output(input_data,
np.array(full_differential_ts).T,
data_type=input_data.data_type)
else:
differential_ts = np.ravel(self._differential_filter(source_ts))
output_data = self._convert_to_output(input_data,
differential_ts,
data_type=input_data.data_type)
return output_data
[docs] def _differential_filter(self, ts):
""":obj:`NumericalDerivative` filter
"""
if self.window_size > ts.shape[0]:
self.log.info(f'NumericalDerivativeFilter: invalid parameter window_size ({self.window_size}) changed to '
f'{self.poly_degree + 1}')
self.params.update(window_size=self.poly_degree + 1)
x = np.arange(ts.shape[0])
ts_len = x.shape[0]
der_f = np.zeros(ts_len)
# Take the differentials in the center of the domain
for center_window in range(self.window_size, ts_len - self.window_size):
points = np.arange(center_window - self.window_size, center_window + self.window_size)
# Fit to a Chebyshev polynomial
# this is the same as any polynomial since we're on a fixed grid but it's better conditioned :)
poly = np.polynomial.chebyshev.Chebyshev.fit(x[points], ts[points], self.poly_degree,
window=[np.min(points), np.max(points)])
der_f[center_window] = poly.deriv(m=self.order)(x[center_window])
supp_1 = ts[0:self.window_size]
coordsupp_1 = x[0:self.window_size]
supp_2 = ts[-self.window_size:]
coordsupp_2 = x[-self.window_size:]
poly = np.polynomial.chebyshev.Chebyshev.fit(coordsupp_1, supp_1, self.window_size - 1)
der_f[0:self.window_size] = poly.deriv(m=self.order)(coordsupp_1)
poly = np.polynomial.chebyshev.Chebyshev.fit(coordsupp_2, supp_2, self.window_size - 1)
der_f[-self.window_size:] = poly.deriv(m=self.order)(coordsupp_2)
for _ in range(self.order):
supp_1 = np.gradient(supp_1, coordsupp_1, edge_order=2)
supp_2 = np.gradient(supp_2, coordsupp_2, edge_order=2)
der_f[0:self.window_size] = supp_1
der_f[-self.window_size:] = supp_2
return np.transpose(der_f)
def _correct_params(self):
if self.poly_degree > 5:
self.log.info(f'NumericalDerivativeFilter: invalid parameter poly_degree ({self.poly_degree}) '
f'changed to {self.max_poly_degree}')
self.params.update(poly_degree=self.max_poly_degree)
if self.order < 1:
self.log.info(f'NumericalDerivativeFilter: invalid parameter order ({self.order}) '
f'changed to {self.default_order}')
self.params.update(degree=self.default_order)
if self.order >= self.poly_degree:
self.log.info(f'NumericalDerivativeFilter: invalid parameter poly_degree ({self.poly_degree}) '
f'changed to {self.order + 1}')
self.params.update(poly_degree=self.order + 1)
if self.window_size < self.poly_degree:
self.log.info(f'NumericalDerivativeFilter: invalid parameter window_size ({self.window_size}) changed to '
f'{self.poly_degree + 1}')
self.params.update(window_size=self.poly_degree + 1)
[docs]class CutImplementation(DataOperationImplementation):
def __init__(self, params: Optional[OperationParameters]):
super().__init__(params)
self._correct_cut_part()
# Define logger object
self.log = default_log(self)
@property
def cut_part(self):
return self.params.get('cut_part')
def _correct_cut_part(self):
if not 0 < self.cut_part <= 0.9:
# Default parameter
self.log.info(f"Change invalid parameter cut_part ({self.cut_part}) on default value (0.5)")
self.params.update(cut_part=0.5)
[docs] def fit(self, input_data: InputData):
"""Class doesn't support fit operation
Args:
input_data: data with features, target and ids to process
"""
pass
[docs] def transform(self, input_data: InputData) -> OutputData:
""" Cut first cut_part from time series\n
``new_len = len - int(self.cut_part * (input_values.shape[0]-horizon))``
Args:
input_data: data with features, target and ids to process
Returns:
output data with cutted time series
"""
old_target = deepcopy(input_data.target)
input_data = self._cut_input_data(input_data)
input_data.target = old_target
output_data = self._convert_to_output(input_data,
input_data.features,
data_type=input_data.data_type)
return output_data
[docs] def transform_for_fit(self, input_data: InputData) -> OutputData:
""" Cut first cut_part from time series for fit stage
``new_len = len - int(self.cut_part * (input_values.shape[0]-horizon))``
Args:
input_data: data with features, target and ids to process
Returns:
output data with cutted time series
"""
input_data = self._cut_input_data(input_data, reset_idx=True)
output_data = self._convert_to_output(input_data,
input_data.features,
data_type=input_data.data_type)
return output_data
def _cut_input_data(self, input_data: InputData, reset_idx: bool = False) -> InputData:
horizon = input_data.task.task_params.forecast_length
input_copy = copy(input_data)
input_values = input_copy.features
cut_len = int(self.cut_part * (input_values.shape[0] - horizon))
output_values = input_values[cut_len::]
input_copy.features = output_values
input_copy.target = output_values
if reset_idx:
input_copy.idx = np.arange(cut_len, input_values.shape[0])
return input_copy
[docs]def ts_to_table(idx, time_series: np.array, window_size: int, is_lag: bool = False):
"""Method convert time series to lagged form.
Args:
idx: the indices of the time series to convert
time_series: source time series
window_size: size of sliding window, which defines lag
is_lag: is function used for lagged transformation.
``False`` needs to convert one dimensional output to lagged form.
Returns:
``updated_idx`` -> clipped indices of time series\n
``features_columns`` -> lagged time series feature table
"""
features_columns = np.array([time_series[i:window_size + i]
for i in range(time_series.shape[0] - window_size + 1)])
if is_lag:
updated_idx = np.concatenate([idx[window_size:], idx[-1:]])
else:
updated_idx = idx[:len(idx) - window_size + 1]
return updated_idx, features_columns
[docs]def _sparse_matrix(logger, features_columns: np.array, n_components_perc=0.5, use_svd=False):
"""Method converts the matrix to sparse form
Args:
features_columns: matrix to sparse
n_components_perc: initial approximation of percent of components to keep
use_svd: is there need to use :obj:`SVD` method for sparse or use naive method
Returns:
reduced dimension matrix
Notes:
shape of returned matrix depends on the number of components which includes
the threshold of explained variance gain
"""
if not n_components_perc:
n_components_perc = 0.5
if use_svd:
n_components = int(features_columns.shape[1] * n_components_perc)
# Getting the initial approximation of number of components
if not n_components:
n_components = int(features_columns.shape[1] * n_components_perc)
if n_components >= features_columns.shape[0]:
n_components = features_columns.shape[0] - 1
logger.info(f'Initial approximation of number of components set as {n_components}')
# Forming the first value of explained variance
components = _get_svd(features_columns, n_components)
else:
step = int(1 / n_components_perc)
indeces_to_stay = np.arange(1, features_columns.shape[1], step)
components = np.take(features_columns, indeces_to_stay, 1)
return components
[docs]def _get_svd(features_columns: np.array, n_components: int):
"""Method converts the matrix to svd sparse form
Args:
features_columns: matrix to sparse
n_components: number of components to keep
Returns:
transformed sparse matrix
"""
svd = TruncatedSVD(n_components=n_components, n_iter=5, random_state=42)
svd.fit(features_columns.T)
components = svd.components_.T
return components
[docs]def prepare_target(all_idx, idx, features_columns: np.array, target, forecast_length: int):
"""Method convert time series to lagged form. Transformation applied
only for generating target table (time series considering as multi-target
regression task)
Args:
all_idx: all indices in data
idx: remaining indices after lagged feature table generation
features_columns: lagged feature table
target: source time series
forecast_length: forecast length
Returns:
``updated_idx``, ``updated_features``, ``updated_target``
.. details:: more information:
- ``updated_idx`` -> clipped indices of time series
- ``updated_features`` -> clipped lagged feature table
- ``updated_target`` -> lagged target table
"""
# Remove last repeated element
idx = idx[: -1]
# Update target (clip first "window size" values)
# be careful, some implementation may be dramatically slow
index_hash_table = {val: i for i, val in enumerate(all_idx)}
row_nums = [index_hash_table[i] for i in idx]
ts_target = target[row_nums]
# Multi-target transformation
if forecast_length > 1:
updated_target = np.array([ts_target[i:forecast_length + i]
for i in range(ts_target.shape[0] - forecast_length + 1)])
updated_idx = idx[: -forecast_length + 1]
updated_features = features_columns[: -forecast_length]
else:
# Forecast horizon equals to 1
updated_idx = idx
updated_features = features_columns[: -1]
updated_target = np.reshape(ts_target, (-1, 1))
return updated_idx, updated_features, updated_target
[docs]def transform_features_and_target_into_lagged(input_data: InputData, forecast_length: int,
window_size: int):
"""Perform lagged transformation firstly on features and secondly on target array
Args:
input_data: dataclass with features
forecast_length: forecast horizon
window_size: window size for features transformation
Returns:
``new_idx``, ``transformed_cols``, ``new_target``
.. details:: more information:
- ``new_idx`` ->
- ``transformed_cols`` ->
- ``new_target`` ->
"""
new_idx, transformed_cols = ts_to_table(idx=input_data.idx,
time_series=input_data.features,
window_size=window_size,
is_lag=True)
new_idx, transformed_cols, new_target = prepare_target(all_idx=input_data.idx,
idx=new_idx,
features_columns=transformed_cols,
target=input_data.target,
forecast_length=forecast_length)
return new_idx, transformed_cols, new_target