from __future__ import annotations
from dataclasses import dataclass
from typing import Any, List, Optional, Tuple, Union
import numpy as np
from golem.core.dag.linked_graph_node import LinkedGraphNode
from golem.core.log import default_log
from golem.core.optimisers.timer import Timer
from golem.serializers.serializer import register_serializable
from fedot.core.data.data import InputData, OutputData
from fedot.core.data.merge.data_merger import DataMerger
from fedot.core.operations.factory import OperationFactory
from fedot.core.operations.operation import Operation
from fedot.core.operations.operation_parameters import OperationParameters
from fedot.core.repository.operation_types_repository import OperationTypesRepository
from fedot.core.utils import DEFAULT_PARAMS_STUB, NESTED_PARAMS_LABEL
[docs]class PipelineNode(LinkedGraphNode):
"""The class defines the interface of nodes modifying tha data flow in the :class:`Pipeline`
Args:
operation_type: operation defined in the operation repository
nodes_from: parent nodes where data comes from
node_data: ``dict`` with :class:`InputData` for fit and predict stage
kwargs: optional arguments (i.e. logger)
"""
def __init__(self, operation_type: Optional[Union[str, Operation]] = None,
nodes_from: Optional[List[PipelineNode]] = None,
node_data: Optional[dict] = None,
**kwargs):
if node_data is None:
self._node_data = {}
self.direct_set = False
else:
self._node_data = node_data
# Was the data passed directly to the node or not
self.direct_set = True
passed_content = kwargs.get('content')
if passed_content:
# Define operation, based on content dictionary
operation = self._process_content_init(passed_content)
params = passed_content.get('params', {})
self.metadata = passed_content.get('metadata', NodeMetadata())
else:
# There is no content for node
operation = self._process_direct_init(operation_type)
# Define operation with default parameters
params = {}
self.metadata = NodeMetadata()
self.fit_time_in_seconds = 0
self.inference_time_in_seconds = 0
super().__init__(content={'name': operation}, nodes_from=nodes_from)
# use parameters.setter to process input parameters correctly
self.parameters = params
self.log = default_log(self)
self._fitted_operation = None
self.rating = None
@property
def is_primary(self):
if not self.nodes_from or len(self.nodes_from) == 0:
return True
[docs] def _process_content_init(self, passed_content: dict) -> Operation:
""" Updating content in the node """
if isinstance(passed_content['name'], str):
# Need to convert name of operation into operation class object
operation_factory = OperationFactory(operation_name=passed_content['name'])
operation = operation_factory.get_operation()
passed_content.update({'name': operation})
else:
operation = passed_content['name']
self.content = passed_content
return operation
[docs] @staticmethod
def _process_direct_init(operation_type: Optional[Union[str, Operation]]) -> Operation:
"""Define operation based on the direct ``operation_type`` without defining content in the node
Args:
operation_type: node type representation
Returns:
Operation: operation class object
"""
if not operation_type:
raise ValueError('Operation is not defined in the node')
if not isinstance(operation_type, str):
# AtomizedModel
operation = operation_type
else:
# Define appropriate operation or data operation
operation_factory = OperationFactory(operation_name=operation_type)
operation = operation_factory.get_operation()
return operation
[docs] def update_params(self):
"""Updates :attr:`custom_params` with changed parameters"""
new_params = self.fitted_operation.get_params()
changed_parameters = new_params.changed_parameters
updated_parameters = {**self.parameters, **changed_parameters}
self.parameters = updated_parameters
@property
def name(self) -> str:
""" Returns str name of operation """
return self.operation.operation_type
@property
def operation(self) -> Operation:
"""Returns node operation object
Returns:
Operation: operation object
"""
return self.content['name']
@operation.setter
def operation(self, value: Operation):
"""Updates ``operation`` property with the provided ``value``
Args:
value: new operation object
"""
self.content.update({'name': value})
@property
def fitted_operation(self) -> Optional[Any]:
"""Returns already fitted operation if exists or ``None`` instead
Returns:
node fitted operation or ``None``
"""
return getattr(self, '_fitted_operation', None)
@fitted_operation.setter
def fitted_operation(self, value: Any):
"""Sets node fitted operation with the provided ``value``
Args:
value: any model from the ``list`` of acceptable nodes for the chosen task and problem
"""
if value is None:
if hasattr(self, '_fitted_operation'):
del self._fitted_operation
else:
self._fitted_operation = value
[docs] def unfit(self):
"""Sets ``node_data`` (if exists) and ``fitted_operation`` to ``None``
"""
self.fitted_operation = None
if hasattr(self, 'node_data'):
self.node_data = None
[docs] def fit(self, input_data: InputData) -> OutputData:
"""Runs training process in the node
Args:
input_data: data used for operation training
Returns:
OutputData: values predicted on the provided ``input_data``
"""
self.log.debug(f'Trying to fit pipeline node with operation: {self.operation}')
input_data = self._get_input_data(input_data=input_data, parent_operation='fit')
if self.fitted_operation is None:
with Timer() as t:
self.fitted_operation, operation_predict = self.operation.fit(params=self._parameters,
data=input_data)
self.fit_time_in_seconds = round(t.seconds_from_start, 3)
else:
operation_predict = self.operation.predict_for_fit(fitted_operation=self.fitted_operation,
data=input_data,
params=self._parameters)
# Update parameters after operation fitting (they can be corrected)
not_atomized_operation = 'atomized' not in self.operation.operation_type
if not_atomized_operation and 'correct_params' in self.operation.metadata.tags:
self.update_params()
return operation_predict
[docs] def predict(self, input_data: InputData, output_mode: str = 'default') -> OutputData:
"""Runs prediction process in the node
Args:
input_data: data used for prediction
output_mode: desired output for operations (e.g. ``'labels'``, ``'probs'``, ``'full_probs'``)
Returns:
OutputData: values predicted on the provided ``input_data``
"""
self.log.debug(f'Obtain prediction in pipeline node by operation: {self.operation}')
input_data = self._get_input_data(input_data=input_data, parent_operation='predict')
with Timer() as t:
operation_predict = self.operation.predict(fitted_operation=self.fitted_operation,
params=self._parameters,
data=input_data,
output_mode=output_mode)
self.inference_time_in_seconds = round(t.seconds_from_start, 3)
return operation_predict
[docs] def get_data_from_node(self) -> dict:
"""Returns data if it was set to the nodes directly
Returns:
dict: ``dict`` with :class:`InputData` for fit and predict stage
"""
return self.node_data
@property
def node_data(self) -> dict:
"""Returns directly set :attr:`node_data`
Returns:
dict: ``dict`` with :class:`InputData` for fit and predict stage
"""
return getattr(self, '_node_data', {})
@node_data.setter
def node_data(self, value: dict):
"""Sets :attr:`node_data`
Args:
value: ``dict`` with :class:`InputData` for fit and predict stage
"""
if value is None:
if hasattr(self, '_node_data'):
del self._node_data
else:
self._node_data = value
def _get_input_data(self, input_data: InputData, parent_operation: str):
if self.nodes_from:
input_data = self._input_from_parents(input_data=input_data, parent_operation=parent_operation)
else:
if self.direct_set:
input_data = self.node_data
else:
self.node_data = input_data
return input_data
[docs] def _nodes_from_with_fixed_order(self):
"""Sorts :attr:`nodes_from` (if exists) by the nodes unique id
Returns:
sorted :attr:`nodes_from` by :obj:`GraphNode.descriptive_id` or ``None``
"""
return sorted(self.nodes_from, key=lambda node: node.descriptive_id)
@property
def parameters(self) -> dict:
"""Returns node custom parameters
Returns:
dict: of custom parameters
"""
return self.content.get('params')
@parameters.setter
def parameters(self, params: dict):
"""Sets custom parameters of the node or set default
Args:
params: new parameters to be placed instead of existing
"""
if params is not None:
# The check for "default_params" is needed for backward compatibility.
if params == DEFAULT_PARAMS_STUB:
params = {}
# take nested params if they appeared (mostly used for tuning)
if NESTED_PARAMS_LABEL in params:
params = params[NESTED_PARAMS_LABEL]
self._parameters = OperationParameters.from_operation_type(self.operation.operation_type, **params)
self.content['params'] = self._parameters.to_dict()
[docs] def __str__(self) -> str:
"""Returns ``str`` representation of the node
Returns:
str: string field node operation type
"""
return str(self.operation.operation_type)
@property
def tags(self) -> Optional[List[str]]:
"""Returns tags of operation in the node or empty list
Returns:
Optional[List[str]]: ``empty list`` if node is of atomized type and ``list of tags`` otherwise
"""
if 'atomized' in self.operation.operation_type:
# There are no tags for atomized operation
return []
info = OperationTypesRepository(operation_type='all').operation_info_by_id(self.operation.operation_type)
if info is not None:
return info.tags
[docs]def _combine_parents(parent_nodes: List[PipelineNode],
input_data: Optional[InputData], parent_operation: str) -> Tuple[List[OutputData], np.array]:
""" Combines predictions from the ``parent_nodes``
Args:
parent_nodes: list of parent nodes, from which predictions will be combined
input_data: input data from pipeline abstraction (source input data)
parent_operation: name of parent operation (``'fit'`` or ``'predict'``)
Returns:
Tuple[List[OutputData], np.array]: :obj:`output data list from parent nodes`,
:obj:`target for final pipeline prediction`
"""
if input_data is not None:
# InputData was set to pipeline
target = input_data.target
parent_results = []
for parent in parent_nodes:
if parent_operation == 'predict':
prediction = parent.predict(input_data=input_data)
parent_results.append(prediction)
elif parent_operation == 'fit':
prediction = parent.fit(input_data=input_data)
parent_results.append(prediction)
else:
raise ValueError("Value parent_operation should be 'fit' or 'predict'")
if input_data is None:
# InputData was set to primary nodes
target = prediction.target
return parent_results, target
# TODO: these two lines are used for backwards compatibility.
# It should be removed and replaced by a script for converting old-style pipelines (with PrimaryNode and SecondaryNode)
# to a new-style ones (only with PipelineNode).
PrimaryNode = PipelineNode
SecondaryNode = PipelineNode