import os
from datetime import datetime
from typing import TYPE_CHECKING
from hgraph import EvaluationLifeCycleObserver, MAX_ET
from hgraph._runtime._constants import utc_now
if TYPE_CHECKING:
from hgraph import Graph, EvaluationClock
from hgraph import Node
__all__ = ("EvaluationProfiler",)
[docs]
class EvaluationProfiler(EvaluationLifeCycleObserver):
"""
Prints out some useful metrics of the running graph, this can be used to help trace down memory leaks.
Takes the following inputs to configure:
start
Log start related events.
eval
Log eval related events.
stop
Log stop related events
node
Log node related events
graph
Log graph related events
"""
def __init__(self, start: bool = True, eval: bool = True, stop: bool = True, node: bool = True, graph: bool = True):
"""
:param start: Log start related events.
:param eval: Log node evaluation related events
:param stop: Log stop related events.
:param node: Log node events.
:param graph: Log graph level events.
"""
self.start = start
self.eval = eval
self.stop = stop
self.node = node
self.graph = graph
try:
import psutil
self.process = psutil.Process(os.getpid())
except ImportError:
self.process = None
def _print(self, evaluation_clock: "EvaluationClock", msg: str) -> None:
print(f"[{utc_now()}][{evaluation_clock.evaluation_time}] {msg}")
def _graph_name(self, graph: "Graph") -> str:
graph_str = []
while graph:
if graph.parent_node:
graph_str.append(
f"{(graph.parent_node.signature.label + ':') if graph.parent_node.signature.label else ''}"
+ f"{graph.parent_node.signature.name}<{', '.join(str(i) for i in graph.graph_id)}>"
)
graph = graph.parent_node.graph
else:
graph = None
return f"[{'::'.join(reversed(graph_str))}]"
def _print_graph(self, graph: "Graph", msg: str) -> None:
parent_details = self._graph_name(graph)
self._print(graph.evaluation_clock, f"{parent_details} {msg}")
def _print_signature(self, node: "Node"):
node_signature = f"{node.signature.signature}"
self._print(node.graph.evaluation_clock, f"{self._graph_name(node.graph)} Starting: {node_signature}")
def _print_node(self, node: "Node", msg: str) -> None:
node_signature = (
f"[{node.signature.wiring_path_name}."
f"{(node.signature.label + ':') if node.signature.label else ''}"
f"{node.signature.name}<{', '.join(str(i) for i in node.node_id)}>("
)
self._print(node.graph.evaluation_clock, f"{self._graph_name(node.graph)} {node_signature} {msg}")
[docs]
def on_before_start_graph(self, graph: "Graph"):
if self.start and self.graph:
self._print_graph(graph, f">> {'.' * 15} Starting Graph {graph.label} {'.' * 15}")
[docs]
def on_after_start_graph(self, graph: "Graph"):
if self.start and self.graph:
self._print_graph(graph, f"<< {'.' * 15} Started Graph {'.' * 15}")
[docs]
def on_before_start_node(self, node: "Node"):
if self.start and self.node:
self._print_signature(node)
[docs]
def on_after_start_node(self, node: "Node"):
if self.start and self.node:
self._print_node(node, "Started node")
[docs]
def on_before_graph_evaluation(self, graph: "Graph"):
if self.eval and self.graph:
self._print_graph(graph, f"{'>' * 20} Eval Start {graph.label} {'>' * 20}")
[docs]
def on_before_node_evaluation(self, node: "Node"):
if self.eval and self.node and self.process:
self.mem = self.process.memory_info().rss / 1024**2
# self._print_node(node, f"[{self.mem}]")
[docs]
def on_after_node_evaluation(self, node: "Node"):
if self.eval and self.node and self.process:
new_mem = self.process.memory_info().rss / 1024**2
if new_mem - self.mem > 0.1:
self._print_node(node, f"[{new_mem - self.mem}, {new_mem}]")
[docs]
def on_after_graph_evaluation(self, graph: "Graph"):
if self.eval and self.graph:
if (
graph.parent_node is not None
and (nt := graph.parent_node.graph.schedule[graph.parent_node.node_ndx])
> graph.evaluation_clock.evaluation_time
and nt < MAX_ET
):
next_scheduled = f" NEXT[{nt}]"
elif graph.parent_node is None:
next_scheduled = f" NEXT[{graph.evaluation_clock.next_scheduled_evaluation_time}]"
else:
next_scheduled = ""
self._print_graph(graph, f"{'<' * 20} Eval Done {'<' * 20}{next_scheduled}")
[docs]
def on_before_stop_node(self, node: "Node"):
# self._print_node(node, "Stopping node")
...
[docs]
def on_after_stop_node(self, node: "Node"):
if self.stop and self.node:
self._print_node(node, "Stopped node")
[docs]
def on_before_stop_graph(self, graph: "Graph"):
if self.stop and self.graph:
self._print_graph(graph, "vvvvvvv Graph stopping -------")
[docs]
def on_after_stop_graph(self, graph: "Graph"):
if self.stop and self.graph:
self._print_graph(graph, "------- Graph stopped vvvvvvv")