Source code for hgraph._runtime._node

from abc import abstractmethod, ABC
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum, IntFlag, auto
from typing import Optional, Mapping, TYPE_CHECKING, Any

from hgraph._runtime._lifecycle import ComponentLifeCycle

if TYPE_CHECKING:
    from hgraph._types import HgTimeSeriesTypeMetaData, HgScalarTypeMetaData, HgRecordableStateType, RECORDABLE_STATE
    from hgraph._types._time_series_types import TimeSeriesInput, TimeSeriesOutput
    from hgraph._types._tsb_type import TimeSeriesBundleInput
    from hgraph._runtime._graph import Graph
    from hgraph._wiring._source_code_details import SourceCodeDetails

__all__ = (
    "Node",
    "NODE",
    "NodeTypeEnum",
    "NodeSignature",
    "SCHEDULER",
    "InjectableTypesEnum",
    "NodeDelegate",
    "ERROR_PATH",
    "STATE_PATH",
)


class NodeTypeEnum(Enum):
    PUSH_SOURCE_NODE = 0
    PULL_SOURCE_NODE = 1
    COMPUTE_NODE = 2
    SINK_NODE = 3


[docs] class InjectableTypesEnum(IntFlag): NONE = 0 STATE = auto() """This node is stateful, provide a state instance here""" RECORDABLE_STATE = auto() SCHEDULER = auto() """This node will schedule itself outside of its defined inputs for evaluation""" OUTPUT = auto() """Access to the output is required""" CLOCK = auto() """Needs to have access to the current engine clock for it's logic""" ENGINE_API = auto() """Needs access to the engine API, typically start and end times for the engine""" LOGGER = auto() """This node produces logging output""" NODE = auto() """Requires access to the node, this should generally be reserved for use in framework nodes""" TRAIT = auto()
@dataclass class NodeSignature: """ This is the generic node signature that can be referenced by all instances of the node. The resolved scalar values are stored on the instance only. """ name: str node_type: NodeTypeEnum args: tuple[str, ...] time_series_inputs: Optional[Mapping[str, "HgTimeSeriesTypeMetaData"]] time_series_output: Optional["HgTimeSeriesTypeMetaData"] scalars: Optional[Mapping[str, "HgScalarTypeMetaData"]] src_location: "SourceCodeDetails" active_inputs: frozenset[str] | None = None valid_inputs: frozenset[str] | None = None all_valid_inputs: frozenset[str] | None = None context_inputs: frozenset[str] | None = None injectable_inputs: Mapping[str, InjectableTypesEnum] | None = None injectables: int = 0 capture_exception: bool = False trace_back_depth: int = 1 wiring_path_name: str = "" label: str = "" capture_values: bool = False record_replay_id: str = "" has_nested_graphs: bool = False @property def uses_scheduler(self) -> bool: return (InjectableTypesEnum.SCHEDULER.value & self.injectables) != 0 @property def uses_clock(self) -> bool: return (InjectableTypesEnum.CLOCK.value & self.injectables) != 0 @property def uses_engine(self) -> bool: return (InjectableTypesEnum.ENGINE_API.value & self.injectables) != 0 @property def uses_state(self) -> bool: return (InjectableTypesEnum.STATE.value & self.injectables) != 0 @property def uses_recordable_state(self) -> bool: return (InjectableTypesEnum.RECORDABLE_STATE.value & self.injectables) != 0 def _recordable_state(self) -> tuple[str, "HgRecordableStateType"] | tuple[None, None]: from hgraph._types._scalar_type_meta_data import HgRecordableStateType return next(((arg, tp) for arg, tp in self.scalars.items() if type(tp) is HgRecordableStateType), (None, None)) @property def recordable_state_arg(self) -> str | None: return self._recordable_state()[0] @property def recordable_state(self) -> "HgRecordableStateType": return self._recordable_state()[1] @property def uses_output_feedback(self) -> bool: return (InjectableTypesEnum.OUTPUT & self.injectables) != 0 @property def is_source_node(self) -> bool: return self.node_type in (NodeTypeEnum.PUSH_SOURCE_NODE, NodeTypeEnum.PULL_SOURCE_NODE) @property def is_pull_source_node(self) -> bool: return self.node_type is NodeTypeEnum.PULL_SOURCE_NODE @property def is_push_source_node(self) -> bool: return self.node_type is NodeTypeEnum.PUSH_SOURCE_NODE @property def is_compute_node(self) -> bool: return self.node_type is NodeTypeEnum.COMPUTE_NODE @property def is_sink_node(self) -> bool: return self.node_type is NodeTypeEnum.SINK_NODE @property def is_recordable(self) -> bool: return bool(self.record_replay_id) @property def signature(self) -> str: input_types = (self.time_series_inputs or {}) | (self.scalars or {}) args = (f"{arg}: {input_types[arg].dereference()}" for arg in self.args) return_ = "" if self.time_series_output is None else f" -> {self.time_series_output.dereference()}" return f"{self.name}({', '.join(args)}){return_}" def to_dict(self) -> dict[str, Any]: return dict( name=self.name, node_type=self.node_type, args=self.args, time_series_inputs=self.time_series_inputs, time_series_output=self.time_series_output, scalars=self.scalars, src_location=self.src_location, active_inputs=self.active_inputs, valid_inputs=self.valid_inputs, all_valid_inputs=self.all_valid_inputs, context_inputs=self.context_inputs, injectable_inputs=self.injectable_inputs, injectables=self.injectables, capture_exception=self.capture_exception, trace_back_depth=self.trace_back_depth, wiring_path_name=self.wiring_path_name, label=self.label, capture_values=self.capture_values, record_replay_id=self.record_replay_id, has_nested_graphs=self.has_nested_graphs, ) def copy_with(self, **kwargs) -> "NodeSignature": kwargs_ = self.to_dict() | kwargs return NodeSignature(**kwargs_) ERROR_PATH: int = -1 # The path in the wiring edges representing the error output of the node STATE_PATH: int = -2 # The path in the wiring edges representing the recordable state output of the node class Node(ComponentLifeCycle, ABC): @property @abstractmethod def node_ndx(self) -> int: """ The relative index of this node within the parent graph's list of nodes. """ @property @abstractmethod def owning_graph_id(self) -> tuple[int, ...]: """ The path from the root graph to the graph containing this node. This is effectively the node_id less the last entry. Thus, the root graph is referenced as (), the first child if (node_ndx of nested_1), ... """ @property @abstractmethod def node_id(self) -> tuple[int, ...]: """ The unique path reference to this node from the root graph running in the system. For a node directly attached to the root graph, the path will be: (node_ndx) For a node within a nested graph structure, it will be something like: (node_ndx of nested_1, ..., node_ndx of nested_n, node_ndx) For nodes with a dynamic nested structure such as a branch, a unique id (integer) is allocated to a branch key and this id is used to represent the key in the path. This is similar to the categorical concept in dataframes. """ @property @abstractmethod def signature(self) -> NodeSignature: """ The signature of the Node provides useful information to describe the node. This can be used for exception and debugging purposes. """ @property @abstractmethod def scalars(self) -> Mapping[str, Any]: """ The scalar values associated to this node. These are the values that are not time-series. """ @property @abstractmethod def graph(self) -> "Graph": """ The graph that this node is a member of. """ @graph.setter @abstractmethod def graph(self, value: "Graph"): """ The graph that this node is a member of. """ @property @abstractmethod def input(self) -> Optional["TimeSeriesBundleInput"]: """ The input as an Unnamed Bundle. This allows the input to be considered as a TSB which is helpful for standardising handling of inputs. The bundle schema is the collection of inputs that are of time-series types. """ @input.setter @abstractmethod def input(self, value: "TimeSeriesBundleInput"): """ This should only be set by the owning graph or a very clever manager. """ @property @abstractmethod def inputs(self) -> Optional[Mapping[str, "TimeSeriesInput"]]: """ The inputs associated to this node. """ @property @abstractmethod def start_inputs(self) -> list["TimeSeriesInput"]: """ The inputs scheduled for start callback """ @property @abstractmethod def output(self) -> Optional["TimeSeriesOutput"]: """ The output of this node. This could be a TimeSeriesBundleOutput or a single output value. """ @output.setter @abstractmethod def output(self, value: "TimeSeriesOutput"): """ This should only be set by the owning graph or a very clever manager. """ @property @abstractmethod def recordable_state(self) -> Optional["RECORDABLE_STATE"]: """ The recordable state associated to this node. """ @recordable_state.setter @abstractmethod def recordable_state(self, value: "RECORDABLE_STATE"): """Set the recordable state instance for this node if present.""" @property @abstractmethod def scheduler(self) -> "SCHEDULER": """ The scheduler for this node. """ @abstractmethod def eval(self): """Called by the graph evaluation engine when the node has been scheduled for evaluation.""" @abstractmethod def notify(self, modified_time: datetime): """Notify the node that it is need of scheduling""" @abstractmethod def notify_next_cycle(self): """Notify the node to be evaluated in the next evaluation cycle""" @property @abstractmethod def error_output(self) -> Optional["TimeSeriesOutput"]: """ An error output of this node. This will tick when the eval method produces an exception instead of a result. This is only available when the node is marked as error checking. """ class NodeDelegate(Node): """Wraps a node delegating all node methods to the underlying implementation.""" def __init__(self, node: Node): super().__init__() self._node = node @property def node_ndx(self) -> int: return self._node.node_ndx @property def owning_graph_id(self) -> tuple[int, ...]: return self._node.owning_graph_id @property def node_id(self) -> tuple[int, ...]: return self._node.node_id @property def signature(self) -> NodeSignature: return self._node.signature @property def scalars(self) -> Mapping[str, Any]: return self._node.scalars @property def graph(self) -> "Graph": return self._node.graph @property def input(self) -> Optional["TimeSeriesBundleInput"]: return self._node.input @property def inputs(self) -> Optional[Mapping[str, "TimeSeriesInput"]]: return self._node.inputs @property def output(self) -> Optional["TimeSeriesOutput"]: return self._node.output @property def error_output(self) -> Optional["TimeSeriesOutput"]: return self._node.error_output @property def scheduler(self) -> "SCHEDULER": return self._node.scheduler def eval(self): self._node.eval() def notify(self, modified_time): self._node.notify(modified_time) def notify_next_cycle(self): self._node.notify_next_cycle()
[docs] class SCHEDULER(ABC): """ An input that is scheduled to be evaluated at a particular time. This is used for time-series inputs that are not bound to an output, but are still required to be evaluated at a particular time. """ @property @abstractmethod def next_scheduled_time(self) -> datetime: """ The time that this input is scheduled to be evaluated. """ @property @abstractmethod def is_scheduled(self) -> bool: """ Are there any pending scheduling events. """ @property @abstractmethod def is_scheduled_now(self) -> bool: """ Was this scheduled in this engine cycle. That is, was the node scheduled for evaluation at the current engine_time. """
[docs] @abstractmethod def has_tag(self, tag: str) -> bool: """ Does this scheduler have the tag specified. """
[docs] @abstractmethod def pop_tag(self, tag: str, default: datetime = None) -> datetime | None: """ Removes the tag and returns the value associated to it. If the tag is not found, then the default value is returned. """
[docs] @abstractmethod def schedule(self, when: datetime | timedelta, tag: str = None, on_wall_clock: bool = False): """ Schedule the node to be evaluated at the time specified. If tag is set, then the scheduled event will be associated to the tag, if a schedule is already set against the tag, it will be replaced with the new entry. """
[docs] @abstractmethod def un_schedule(self, tag: str = None): """ If tag is set, this will remove the scheduled event associated with this tag, if there is nothing scheduled for the tag, nothing is done. If the tag is not set, then remove the next scheduled item. """
[docs] @abstractmethod def reset(self): """ Remove all scheduled events. """
NODE = Node