Source code for fedot.core.operations.operation

from abc import abstractmethod
from typing import Any, Dict, Optional, Union

from golem.core.log import default_log
from golem.serializers.serializer import register_serializable

from fedot.core.data.data import InputData, OutputData
from fedot.core.operations.hyperparameters_preprocessing import HyperparametersPreprocessor
from fedot.core.operations.operation_parameters import OperationParameters
from fedot.core.repository.operation_types_repository import OperationMetaInfo
from fedot.core.repository.operation_types_repository import OperationTypesRepository
from fedot.core.repository.tasks import Task, TaskTypesEnum, compatible_task_types
from fedot.utilities.custom_errors import AbstractMethodNotImplementError


[docs]@register_serializable class Operation: """Base class for operations in nodes. Operations could be machine learning (or statistical) models or data operations Args: operation_type: name of the operation """ def __init__(self, operation_type: str, **kwargs): self.operation_type = operation_type self._eval_strategy = None self.operations_repo: Optional[OperationTypesRepository] = None self.fitted_operation = None self.log = default_log(self) def _init(self, task: Task, **kwargs): params = kwargs.get('params') if not params: params = OperationParameters.from_operation_type(self.operation_type) if isinstance(params, dict): params = OperationParameters.from_operation_type(self.operation_type, **params) params_for_fit = HyperparametersPreprocessor(operation_type=self.operation_type, n_samples_data=kwargs.get('n_samples_data')) \ .correct(params.to_dict()) params_for_fit = OperationParameters.from_operation_type(self.operation_type, **params_for_fit) try: self._eval_strategy = \ _eval_strategy_for_task(self.operation_type, task.task_type, self.operations_repo)(self.operation_type, params_for_fit) except Exception as ex: self.log.error(f'Can not find evaluation strategy because of {ex}') raise ex if 'output_mode' in kwargs: self._eval_strategy.output_mode = kwargs['output_mode'] def description(self, operation_params: dict) -> str: operation_type = self.operation_type return f'n_{operation_type}_{operation_params}' @property def acceptable_task_types(self): operation_info = self.operations_repo.operation_info_by_id( self.operation_type) return operation_info.task_type @property def metadata(self) -> OperationMetaInfo: operation_info = self.operations_repo.operation_info_by_id(self.operation_type) if not operation_info: raise ValueError(f'{self.__class__.__name__} {self.operation_type} not found') return operation_info
[docs] def fit(self, params: Optional[Union[OperationParameters, dict]], data: InputData): """This method is used for defining and running of the evaluation strategy to train the operation with the data provided Args: params: hyperparameters for operation data: data used for operation training Returns: tuple: trained operation and prediction on train data """ self._init(data.task, params=params, n_samples_data=data.features.shape[0]) self.fitted_operation = self._eval_strategy.fit(train_data=data) predict_train = self.predict_for_fit(self.fitted_operation, data, params) return self.fitted_operation, predict_train
[docs] def predict(self, fitted_operation, data: InputData, params: Optional[Union[OperationParameters, dict]] = None, output_mode: str = 'default'): """This method is used for defining and running of the evaluation strategy to predict with the data provided Args: fitted_operation: trained operation object data: data used for prediction params: hyperparameters for operation output_mode: string with information about output of operation, for example, is the operation predict probabilities or class labels """ return self._predict(fitted_operation, data, params, output_mode, is_fit_stage=False)
[docs] def predict_for_fit(self, fitted_operation, data: InputData, params: Optional[OperationParameters] = None, output_mode: str = 'default'): """This method is used for defining and running of the evaluation strategy to predict with the data provided during fit stage Args: fitted_operation: trained operation object data: data used for prediction params: hyperparameters for operation output_mode: string with information about output of operation, for example, is the operation predict probabilities or class labels """ return self._predict(fitted_operation, data, params, output_mode, is_fit_stage=True)
def _predict(self, fitted_operation, data: InputData, params: Optional[OperationParameters] = None, output_mode: str = 'default', is_fit_stage: bool = False): is_main_target = data.supplementary_data.is_main_target data_flow_length = data.supplementary_data.data_flow_length self._init(data.task, output_mode=output_mode, params=params, n_samples_data=data.features.shape[0]) if is_fit_stage: prediction = self._eval_strategy.predict_for_fit( trained_operation=fitted_operation, predict_data=data) else: prediction = self._eval_strategy.predict( trained_operation=fitted_operation, predict_data=data) prediction = self.assign_tabular_column_types(prediction, output_mode) # any inplace operations here are dangerous! if is_main_target is False: prediction.supplementary_data.is_main_target = is_main_target prediction.supplementary_data.data_flow_length = data_flow_length return prediction
[docs] @staticmethod @abstractmethod def assign_tabular_column_types(output_data: OutputData, output_mode: str) -> OutputData: """Assign types for columns based on task and output_mode (for classification)\n For example, pipeline for solving time series forecasting task contains lagged and ridge operations. ``ts_type -> lagged -> tabular type``\n So, there is a need to assign column types to new data """ raise AbstractMethodNotImplementError
def __str__(self): return f'{self.operation_type}'
[docs] def to_json(self) -> Dict[str, Any]: """Serializes object and ignores unrelevant fields.""" return { k: v for k, v in sorted(vars(self).items()) if k not in ['log', 'operations_repo', '_eval_strategy', 'fitted_operation'] }
[docs]def _eval_strategy_for_task(operation_type: str, current_task_type: TaskTypesEnum, operations_repo: OperationTypesRepository): """The function returns the strategy for the selected operation and task type. And if it is necessary, found acceptable strategy for operation Args: operation_type: name of operation, for example, ``'ridge'`` current_task_type: task to solve operations_repo: repository with operations Returns: EvaluationStrategy: ``EvaluationStrategy`` class for this operation """ # Get acceptable task types for operation operation_info = operations_repo.operation_info_by_id(operation_type) if operation_info is None: raise ValueError(f'{operation_type} is not implemented ' f'in {operations_repo.repository_name}') acceptable_task_types = operation_info.task_type # If the operation can't be used directly for the task type from data set_acceptable_types = set(acceptable_task_types) if current_task_type not in acceptable_task_types: # Search the supplementary task types, that can be included in pipeline # which solves main task globally_compatible_task_types = compatible_task_types(current_task_type) globally_set = set(globally_compatible_task_types) comp_types_acceptable_for_operation = sorted(list(set_acceptable_types.intersection(globally_set))) if len(comp_types_acceptable_for_operation) == 0: raise ValueError(f'Operation {operation_type} can not be used as a part of {current_task_type}.') current_task_type = comp_types_acceptable_for_operation[0] strategy = operations_repo.operation_info_by_id(operation_type).current_strategy(current_task_type) return strategy