Source code for hgraph.test._node_printer

import logging
from typing import TYPE_CHECKING

from hgraph._runtime._constants import MAX_ET
from hgraph._types import HgSchedulerType, Injector
from hgraph._builder import Builder

if TYPE_CHECKING:
    from hgraph import Graph, EvaluationClock
    from hgraph import Node

__all__ = ("EvaluationTrace",)

from hgraph import EvaluationLifeCycleObserver

logger = logging.getLogger(__name__)


[docs] class EvaluationTrace(EvaluationLifeCycleObserver): """ Logs out the different steps as the engine evaluates the graph. This is voluminous but can be helpful tracing down unexpected behaviour. filter Used to restrict which node and graph events to report using a simple containment check of the node and its path. start Log start related events. eval Log eval related events. stop Log stop related events node Log node related events graph Log graph related events """ _PRINT_ALL_VALUES: bool = False _USE_LOGGER: bool = True
[docs] @staticmethod def set_print_all_values(value: bool): """To see all values (not only ticked ones) in the trace set this to True.""" EvaluationTrace._PRINT_ALL_VALUES = value
[docs] @staticmethod def set_use_logger(value: bool): """To use print instead of the logger set this to False.""" EvaluationTrace._USE_LOGGER = value
def __init__( self, filter: str = None, start: bool = True, eval: bool = True, stop: bool = True, node: bool = True, graph: bool = True, ): self.filter = filter self.start = start self.eval = eval self.stop = stop self.node = node self.graph = graph def _print(self, evaluation_clock: "EvaluationClock", msg: str) -> None: m = f"[{evaluation_clock.evaluation_time}] {msg}" if EvaluationTrace._USE_LOGGER: logger.info(m) else: print(m, flush=True) def _graph_name(self, graph: "Graph") -> str: graph_str = [] while graph: if graph.parent_node: graph_str.append( f"{graph.parent_node.signature.name}<{', '.join(str(i) for i in graph.graph_id)}>" + f"{(':' + graph.parent_node.signature.label) if graph.parent_node.signature.label else ''}" ) graph = graph.parent_node.graph else: graph = None return f"[{'::'.join(reversed(graph_str))}]" def _print_graph(self, graph: "Graph", msg: str) -> None: parent_details = self._graph_name(graph) self._print(graph.evaluation_clock, f"{parent_details} {msg}") def _print_signature(self, node: "Node"): node_signature = f"{node.signature.signature}" self._print(node.graph.evaluation_clock, f"{self._graph_name(node.graph)} Starting: {node_signature}") def _print_node( self, node: "Node", msg: str, add_input: bool = False, add_output: bool = False, add_scheduled_time: bool = False, ) -> None: node_signature = self._node_name(node) if node.signature.time_series_inputs: if add_input: inputs = node.inputs node_signature += ", ".join( f"{f'*{arg}*' if (mod_ := (in_ := inputs[arg]).modified) else arg}={(in_.delta_value if (mod_ or self._PRINT_ALL_VALUES) else '') if in_.valid else '<UnSet>'}" for arg in node.signature.time_series_inputs.keys() ) if node.signature.uses_scheduler: scheduler_arg, scheduler_value = [ (k, v) for k, v in node.signature.scalars.items() if isinstance(v, HgSchedulerType) ][0] node_signature += ( f", *{scheduler_arg}*" if node.scheduler.is_scheduled_now else f", {scheduler_arg}" ) else: node_signature += "..." node_signature += ")" if node.signature.time_series_output and add_output: mod_ = node.output.modified if node.output else False value_ = ( (node.output.delta_value if mod_ else node.output.value) if (node.output and node.output.valid) else "<UnSet>" ) node_signature += f"{' *->* ' if mod_ else ' -> '}{value_}" if add_scheduled_time: scheduled_msg = f" SCHED[{node.scheduler.next_scheduled_time}]" else: scheduled_msg = "" self._print( node.graph.evaluation_clock, f"{self._graph_name(node.graph)} {node_signature} {msg}{scheduled_msg}" ) def _node_name(self, node): node_signature = ( f"[{node.signature.wiring_path_name}." f"{(node.signature.label + ':') if node.signature.label else ''}" f"{node.signature.name}<{', '.join(str(i) for i in node.node_id)}>(" ) return node_signature
[docs] def on_before_start_graph(self, graph: "Graph"): if self.start and self.graph and (self.filter is None or self.filter in self._graph_name(graph)): self._print_graph(graph, f">> {'.' * 15} Starting Graph {graph.label} {'.' * 15}")
[docs] def on_after_start_graph(self, graph: "Graph"): if self.start and self.graph and (self.filter is None or self.filter in graph.label): self._print_graph(graph, f"<< {'.' * 15} Started Graph {'.' * 15}")
[docs] def on_before_start_node(self, node: "Node"): if self.start and self.node and (self.filter is None or self.filter in self._node_name(node)): self._print_signature(node)
[docs] def on_after_start_node(self, node: "Node"): if self.start and self.node and (self.filter is None or self.filter in self._node_name(node)): fmt_id = lambda id: f"<{', '.join(str(i) for i in id)}>" inputs = ", ".join( f"{k}: {fmt_id(n.output.owning_node.node_id) if n.output else '?'}" for k, n in node.inputs.items() ) scalars = ", ".join(f"{k}: {v}" for k, v in node.scalars.items() if not isinstance(v, (Injector, Builder))) and_ = " and " if inputs and scalars else "" self._print_node(node, f"Started node with {inputs}{and_}{scalars}", add_output=True)
[docs] def on_before_graph_evaluation(self, graph: "Graph"): if self.eval and self.graph and (self.filter is None or self.filter in self._graph_name(graph)): self._print_graph(graph, f"{'>' * 20} Eval Start {graph.label} {'>' * 20}")
[docs] def on_before_node_evaluation(self, node: "Node"): if node.signature.is_source_node: return if self.eval and self.node and (self.filter is None or self.filter in self._node_name(node)): self._print_node(node, "[IN]", add_input=True)
[docs] def on_after_node_evaluation(self, node: "Node"): if node.signature.is_sink_node: return if self.eval and self.node and (self.filter is None or self.filter in self._node_name(node)): self._print_node( node, "[OUT]", add_output=True, add_scheduled_time=node.signature.uses_scheduler and node.scheduler.next_scheduled_time == node.graph.schedule[node.node_ndx], )
[docs] def on_after_graph_evaluation(self, graph: "Graph"): if self.eval and self.graph and (self.filter is None or self.filter in self._graph_name(graph)): if ( graph.parent_node is not None and (nt := graph.parent_node.graph.schedule[graph.parent_node.node_ndx]) > graph.evaluation_clock.evaluation_time and nt < MAX_ET ): next_scheduled = f" NEXT[{nt}]" elif graph.parent_node is None: next_scheduled = f" NEXT[{graph.evaluation_clock.next_scheduled_evaluation_time}]" else: next_scheduled = "" self._print_graph(graph, f"{'<' * 20} Eval Done {'<' * 20}{next_scheduled}")
[docs] def on_before_stop_node(self, node: "Node"): # self._print_node(node, "Stopping node") ...
[docs] def on_after_stop_node(self, node: "Node"): if self.stop and self.node and (self.filter is None or self.filter in self._node_name(node)): self._print_node(node, "Stopped node")
[docs] def on_before_stop_graph(self, graph: "Graph"): if self.stop and self.graph and (self.filter is None or self.filter in self._graph_name(graph)): self._print_graph(graph, "vvvvvvv Graph stopping -------")
[docs] def on_after_stop_graph(self, graph: "Graph"): if self.stop and self.graph and (self.filter is None or self.filter in self._graph_name(graph)): self._print_graph(graph, "------- Graph stopped vvvvvvv")