Source code for hgraph._runtime._evaluation_engine

from abc import abstractmethod, ABC
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING

from hgraph._runtime._lifecycle import ComponentLifeCycle

if TYPE_CHECKING:
    from hgraph._runtime._evaluation_clock import EvaluationClock, EngineEvaluationClock
    from hgraph._runtime._node import Node
    from hgraph._runtime._graph import Graph

__all__ = ("EvaluationMode", "EvaluationLifeCycleObserver", "EvaluationEngine", "EvaluationEngineApi")

"""
The evaluation engine is split into parts, to clearly indicate the user accessible items vs internal
engine items.
"""


[docs] class EvaluationMode(Enum): """ The mode to use when executing the graph. The MODES are as follows: REAL_TIME The graph will run from start time until the graph is either requested to stop or is killed. If start time is in the past the graph will potentially replay historical PULL source nodes until it has caught up and will then continue from that point on once it catches up to the wall clock time it started. PUSH source nodes are only evaluated in REAL_TIME mode, these will only be started and evaluated in REAL_TIME mode. Events from PULL source nodes are only PULLED into the graph once we are caught up and are processed in a first come - first served basis. I.e. there is no cross node arrival time ordering of PUSH source nodes. Time ordering is managed within a PUSH source nodes output when the node is queueing, but the ordering is limited to when the received value is placed into the graph, not the time-delta between the events. SIMULATION The graph will run from start time to end time, all events to be processed are introduced from PULL source nodes and when there are no new events to process the graph is considered complete. In this mode the wall clock time (clock.now) is simulated and updated to be the next events time at the end of an evaluation cycle. PUSH source nodes are not supported in this mode of operation. """ REAL_TIME = 0 SIMULATION = 1
[docs] class EvaluationLifeCycleObserver: """ Provide the callbacks that can be received during the evaluation of the graph. Use this with care as each additional life-cycle observer will slow down the evaluation of the graph. """
[docs] def on_before_start_graph(self, graph: "Graph"): """ Called before the graph is started. """
[docs] def on_after_start_graph(self, graph: "Graph"): """ Called after the graph is started. """
[docs] def on_before_start_node(self, node: "Node"): """ Called before a node is started. """
[docs] def on_after_start_node(self, node: "Node"): """ Called after a node is started. """
[docs] def on_before_graph_evaluation(self, graph: "Graph"): """ Called before the graph is evaluated. """
[docs] def on_before_node_evaluation(self, node: "Node"): """ Called before a node is evaluated. """
[docs] def on_after_node_evaluation(self, node: "Node"): """ Called after a node is evaluated. """
[docs] def on_after_graph_push_nodes_evaluation(self, graph: "Graph"): """ Called after the graph has evaluated all its push nodes. """
[docs] def on_after_graph_evaluation(self, graph: "Graph"): """ Called after the graph is evaluated. """
[docs] def on_before_stop_node(self, node: "Node"): """ Called before a node is stopped. """
[docs] def on_after_stop_node(self, node: "Node"): """ Called after a node is stopped. """
[docs] def on_before_stop_graph(self, graph: "Graph"): """ Called before the graph is stopped. """
[docs] def on_after_stop_graph(self, graph: "Graph"): """ Called after the graph is stopped. """
[docs] class EvaluationEngineApi(ComponentLifeCycle, ABC): """ The user visible API for the evaluation engine. """ @property @abstractmethod def evaluation_mode(self) -> EvaluationMode: """ The current mode of evaluation """ @property @abstractmethod def start_time(self) -> datetime: """ The start time of the evaluation engine. """ @property @abstractmethod def end_time(self) -> datetime: """ The end time of the evaluation engine. """ @property @abstractmethod def evaluation_clock(self) -> "EvaluationClock": """ The evaluation clock. """
[docs] @abstractmethod def request_engine_stop(self): """ Request the evaluation engine to stop processing events and exit. This will not stop the graph immediately, and will only be processed after the current evaluation cycle has completed. """
@property @abstractmethod def is_stop_requested(self) -> bool: """ Returns True if the engine has been requested to stop. """
[docs] @abstractmethod def add_before_evaluation_notification(self, fn: callable): """ Add a before evaluation notification observer. The notification is called once before the next evaluation cycle. """
[docs] @abstractmethod def add_after_evaluation_notification(self, fn: callable): """ Add an after evaluation notification observer. The notification is called once after the evaluation of the current cycle. """
[docs] @abstractmethod def add_life_cycle_observer(self, observer: EvaluationLifeCycleObserver): """ Add a graph engine life-cycle observer. Life cycle events will immediately start to be delivered to the observer. The observer will continue to receive events until it is removed. """
[docs] @abstractmethod def remove_life_cycle_observer(self, observer: EvaluationLifeCycleObserver): """ Remove the provided life-cycle observer from the engine. This is immediately effective. """
class EvaluationEngine(EvaluationEngineApi, ABC): """ This extends the API to include the internal methods that are used to support the operations of an evaluation engine. This API is accessible to nested graph engines. Only the master graph engine implements the API. """ @property @abstractmethod def engine_evaluation_clock(self) -> "EngineEvaluationClock": """ The engine evaluation clock. This is used by the graph engine and nested graph engines. """ def advance_engine_time(self): """ Advance the engine time, this will deal with stopping the engine if we have reached the end time or if the engine has been requested to be stopped. """ def notify_before_evaluation(self): """ Notify observers before evaluation. This is on the outermost graph before any graphs are evaluated. """ def notify_after_evaluation(self): """ Notify observers after evaluation of the outermost graph, that is after all graphs have been evaluated. """ def notify_before_graph_evaluation(self, graph: "Graph"): """Notify observers before graph evaluation""" def notify_after_graph_evaluation(self, graph: "Graph"): """Notify observers after graph evaluation""" def notify_before_node_evaluation(self, node: "Node"): """Notify observers before node evaluation""" def notify_after_node_evaluation(self, node: "Node"): """Notify observers after node evaluation""" def notify_after_push_nodes_evaluation(self, graph): """Notify observers after the graph has evaluated all its push nodes""" def notify_before_start_graph(self, graph: "Graph"): """Notify observers that the graph is about to start""" def notify_after_start_graph(self, graph: "Graph"): """Notify observers that the graph has started""" def notify_before_stop_graph(self, graph: "Graph"): """Notify observers that the graph is about to stop""" def notify_after_stop_graph(self, graph: "Graph"): """ Notify observers that the graph has stopped """ def notify_before_start_node(self, node: "Node"): """ Notify observers that the node is about to start. """ def notify_after_start_node(self, node: "Node"): """ Notify observers that the node has started. """ def notify_before_stop_node(self, node: "Node"): """ Notify observers that the node is about to stop. """ def notify_after_stop_node(self, node: "Node"): """ Notify observers that the node has stopped. """ class EvaluationEngineDelegate(EvaluationEngine): """ A delegate that can be used to extend the evaluation engine with additional behaviour. This delegates all calls to the provided engine instance. """ def __init__(self, engine: EvaluationEngine): super().__init__() self._engine = engine @property def engine_evaluation_clock(self) -> "EngineEvaluationClock": return self._engine.engine_evaluation_clock @property def start_time(self) -> datetime: return self._engine.start_time @property def end_time(self) -> datetime: return self._engine.end_time @property def evaluation_mode(self) -> EvaluationMode: return self._engine.evaluation_mode @property def evaluation_clock(self) -> "EvaluationClock": return self._engine.evaluation_clock def request_engine_stop(self): self._engine.request_engine_stop() @property def is_stop_requested(self) -> bool: return self._engine.is_stop_requested def add_before_evaluation_notification(self, fn: callable): self._engine.add_before_evaluation_notification(fn) def add_after_evaluation_notification(self, fn: callable): self._engine.add_after_evaluation_notification(fn) def add_life_cycle_observer(self, observer: EvaluationLifeCycleObserver): self._engine.add_life_cycle_observer(observer) def remove_life_cycle_observer(self, observer: EvaluationLifeCycleObserver): self._engine.remove_life_cycle_observer(observer) def advance_engine_time(self): self._engine.advance_engine_time() def notify_before_evaluation(self): self._engine.notify_before_evaluation() def notify_after_evaluation(self): self._engine.notify_after_evaluation() def notify_before_graph_evaluation(self, graph: "Graph"): self._engine.notify_before_graph_evaluation(graph) def notify_after_graph_evaluation(self, graph: "Graph"): self._engine.notify_after_graph_evaluation(graph) def notify_before_node_evaluation(self, node: "Node"): self._engine.notify_before_node_evaluation(node) def notify_after_node_evaluation(self, node: "Node"): self._engine.notify_after_node_evaluation(node) def notify_before_start_graph(self, graph: "Graph"): self._engine.notify_before_start_graph(graph) def notify_after_start_graph(self, graph: "Graph"): self._engine.notify_after_start_graph(graph) def notify_before_stop_graph(self, graph: "Graph"): self._engine.notify_before_stop_graph(graph) def notify_after_stop_graph(self, graph: "Graph"): self._engine.notify_after_stop_graph(graph) def notify_before_start_node(self, node: "Node"): self._engine.notify_before_start_node(node) def notify_after_start_node(self, node: "Node"): self._engine.notify_after_start_node(node) def notify_before_stop_node(self, node: "Node"): self._engine.notify_before_stop_node(node) def notify_after_stop_node(self, node: "Node"): self._engine.notify_after_stop_node(node)