Source code for hgraph._runtime._graph_runner

import gc
import sys
import warnings
import time
from contextlib import nullcontext
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from logging import Logger, getLogger, DEBUG, StreamHandler, Formatter
from typing import Callable, Any

from typing_extensions import deprecated

from hgraph._runtime._constants import MIN_ST, MAX_ET, MIN_DT, utc_now
from hgraph._runtime._evaluation_engine import EvaluationMode, EvaluationLifeCycleObserver
from hgraph._runtime._global_state import GlobalState
from hgraph._runtime._graph_executor import GraphEngineFactory
from hgraph._wiring._wiring_observer import WiringObserver

__all__ = ("run_graph", "evaluate_graph", "GraphConfiguration", "node_path_log_formatter")


def _default_logger() -> Logger:
    logger = getLogger("hgraph")
    if not logger.handlers:
        # If no handler exists, assume we need to create one.
        logger.setLevel(DEBUG)
        # create console handler and set level to debug
        ch = StreamHandler(sys.stdout)
        ch.setLevel(DEBUG)
        # create formatter
        formatter = Formatter("%(asctime)s [%(name)s][%(levelname)s] %(message)s")
        # add formatter to ch
        ch.setFormatter(formatter)
        # add ch to logger
        logger.addHandler(ch)
    warnings.showwarning = warn_with_log
    warnings.filterwarnings("once", category=DeprecationWarning)
    return logger


def node_path_log_formatter(
    level, msg, args, exc_info=None, extra=None, stack_info=False, stacklevel=1, node_path=None, __orig_log__=None
):
    """A formatter that prints out the node path in addition to the log message."""
    return __orig_log__(level, f"{node_path}:\n{msg}", args, exc_info, extra, stack_info, stacklevel)


def warn_with_log(message, category, filename, lineno, file=None, line=None):
    log = getLogger("hgraph")
    log.warning(f"{filename}:{lineno}: {category.__name__}: {message}")


[docs] @dataclass class GraphConfiguration: """ The configuration to be supplied to ``evaluate_graph``. The following properties are defined: run_mode Either ``REAL_TIME`` or ``SIMULATION``. start_time The first time to evaluate the engine for, this cannot be earlier than MIN_ST. end_time The last time to evaluate the engine for (inclusive), this cannot be later than MAX_ET. trace Turn on tracing by setting this to ``True``. It is also possible to be selective with tracing by setting this to a dict of the form ``{"start": False, "stop": False, "eval": False}``. Setting the value to be ``True`` will turn on tracing of this element, ``False`` will turn off tracing of the particular life-cycle. For more information on available options see: :class:`hgraph.test.EvaluationTrace` profile Similar to tracing, except setting this will turn on profiling of the graph. See :class:`hgraph.test.EvaluationProfiler` for more information as to the options. life_cycle_observers This allows for additional life-cycle observers to be registered. This should be supplied as a tuple of :class:`hgraph.EvaluationLifeCycleObserver` instances. trace_wiring This indicates an interest in observing the wiring choices made during the wiring stage of the graph. As with tracing and profiling, a dictionary of options can also be supplied see :class:`hgraph.test.WiringTracer` for more information on the options. wiring_observers This allows for custom wiring observers to be registered. This should be supplied as a tuple of :class:`hgraph.WiringObserver` instances. graph_logger The instance of the Python ``Logger`` to use for graph logging, by default an instance of the logger will be setup and registered under the 'hgraph' name. (Can be retrieved using ``getLogger('hgraph')``) trace_back_depth Used as a parameter to the error handling logic to determine the depth of traceback to capture. capture_values capture the values of the inputs to the trace-back (default is False) default_log_level The default log level to use, the default is DEBUG. logger_formatter Use to provide a custom formatter to override the default logger._log method. The node_path is supplied as an extra argument to the formatter as well as the original log method (as ``__orig_log__``). An example formatter is provided as ``node_path_log_formatter``. """ run_mode: EvaluationMode = EvaluationMode.SIMULATION start_time: datetime = MIN_DT end_time: datetime = MAX_ET trace: bool | dict = False profile: bool | dict = False life_cycle_observers: tuple[EvaluationLifeCycleObserver, ...] = tuple() trace_wiring: bool | dict = False wiring_observers: tuple[WiringObserver, ...] = tuple() graph_logger: Logger = field(default_factory=_default_logger) trace_back_depth: int = 1 capture_values: bool = False default_log_level: int = DEBUG logger_formatter: Callable = None cleanup_on_error: bool = True def __post_init__(self): if self.start_time == MIN_DT: self.start_time = MIN_ST if self.run_mode is EvaluationMode.SIMULATION else utc_now() if self.start_time < MIN_ST: raise RuntimeError(f"Start time '{self.start_time}' is less than minimum time '{MIN_ST}'") if isinstance(self.end_time, timedelta): if self.run_mode is EvaluationMode.SIMULATION: self.end_time = self.start_time + self.end_time elif self.run_mode is EvaluationMode.REAL_TIME: self.end_time = utc_now() + self.end_time if self.end_time > MAX_ET: raise RuntimeError(f"End time '{self.end_time}' is greater than maximum time '{MAX_ET}'") if self.trace: from hgraph.test import EvaluationTrace self.life_cycle_observers = self.life_cycle_observers + ( EvaluationTrace(**(self.trace if type(self.trace) is dict else {})), ) if self.profile: from hgraph.test import EvaluationProfiler self.life_cycle_observers = self.life_cycle_observers + ( EvaluationProfiler(**(self.profile if type(self.profile) is dict else {})), ) if self.trace_wiring: from hgraph.test import WiringTracer self.wiring_observers = self.wiring_observers + ( WiringTracer(**(self.trace_wiring if type(self.trace_wiring) is dict else {})), ) if self.default_log_level != DEBUG: self.graph_logger.setLevel(self.default_log_level) @property def error_capture_options(self): return {"trace_back_depth": self.trace_back_depth, "capture_values": self.capture_values}
[docs] def evaluate_graph(graph: Callable, config: GraphConfiguration, *args, **kwargs) -> list[tuple[datetime, Any]] | None: """ Wires the ``graph`` supplied, constructs the evaluation engine using the configuration supplied and starts the engines' run loop. If the ``graph`` has an ouput, this will collect the results in memory and return them as the end, once the run loop exists. .. note: Recording results can be memory intensive, don't use top level graphs with outputs unless you intend on using the results. :param graph: The graph to evaluate. :param config: The configuration used to construct the evaluation engine. :param args: Any arguments to supply to the graph. :param kwargs: Any kwargs to supply to the graph. :return: The list of engine time and value for each tick returned by the graph if an output is present, None otherwise. """ from hgraph._builder._graph_builder import GraphBuilder from hgraph._wiring._wiring_node_signature import WiringNodeSignature from hgraph._impl._operators._record_replay_in_memory import get_recorded_value with GlobalState() if not GlobalState.has_instance() else nullcontext(): signature: WiringNodeSignature = None if not isinstance(graph, GraphBuilder): start = time.perf_counter_ns() graph_builder, signature = _build_main_graph(graph, config, args, kwargs) config.graph_logger.debug(f"Graph wiring completed in {(time.perf_counter_ns() - start) / 1e9:.2f} seconds") else: graph_builder = graph config.graph_logger.debug("Creating graph engine: %s", config.run_mode) engine = GraphEngineFactory.make( graph_builder=graph_builder, run_mode=config.run_mode, observers=config.life_cycle_observers, cleanup_on_error=config.cleanup_on_error, ) gc.collect() # Clean up any garbage from wiring gc.freeze() # Freeze the graph memory config.graph_logger.debug("Starting to run graph from: %s to %s", config.start_time, config.end_time) try: GlobalState.instance()["__graph_logger__"] = config.graph_logger GlobalState.instance()["__graph_custom_formatter__"] = config.logger_formatter engine.run(config.start_time, config.end_time) if signature is not None and signature.output_type: return get_recorded_value("__out__") except Exception as e: config.graph_logger.exception("Graph failed", exc_info=True) raise e finally: config.graph_logger.debug("Finished running graph")
def _build_main_graph(graph, config, args, kwargs): from hgraph._wiring._wiring_node_instance import WiringNodeInstanceContext from hgraph._wiring._graph_builder import wire_graph from hgraph._operators._record_replay import record from hgraph._builder._graph_builder import GraphBuilder config.graph_logger.debug("Wiring graph: %s", graph.signature.signature) signature = graph.signature if signature.output_type: graph_ = graph def _record(*args, **kwargs): out = graph_(*args, **kwargs) record(out, "__out__") graph = _record with WiringNodeInstanceContext(error_capture_options=config.error_capture_options): from hgraph._wiring._wiring_observer import WiringObserverContext with WiringObserverContext() as wiring_observer_context: for observer in config.wiring_observers: wiring_observer_context.add_wiring_observer(observer) graph_builder: GraphBuilder = wire_graph(graph, *args, **kwargs) return graph_builder, signature @deprecated("Use evaluate_graph instead") def run_graph( graph: Callable, *args, run_mode: EvaluationMode = EvaluationMode.SIMULATION, start_time: datetime = None, end_time: datetime | timedelta = None, print_progress: bool = True, life_cycle_observers: [EvaluationLifeCycleObserver] = None, __trace__: bool | dict = False, __profile__: bool | dict = False, __trace_wiring__: bool | dict = False, __logger__: Logger = None, __trace_back_depth__: int = 1, __capture_values__: bool = False, **kwargs, ): """ Use this to initiate the graph engine run loop. The run_mode indicates how the graph engine should evaluate the graph, in EvaluationMode.REAL_TIME the graph will be evaluated using the system clock, in EvaluationMode.SIMULATION the graph will be evaluated using a simulated clock. The simulated clock is advanced as fast as possible without following the system clock timings. This allows a back-test to be evaluated as fast as possible. :param graph: The graph to evaluate :param args: Any arguments to pass to the graph :param run_mode: The mode to evaluate the graph in :param start_time: The time to start the graph :param end_time: The time to end the graph (this is exclusive) :param print_progress: If true, print the progress of the graph (will go away and be replaced with logging later) :param life_cycle_observers: A list of observers to register with the runtime engine prior to evaluation. :param kwargs: Any additional kwargs to pass to the graph. """ kwargs_ = { "run_mode": run_mode, "trace": __trace__, "profile": __profile__, "trace_wiring": __trace_wiring__, } if __logger__ is not None: kwargs_["graph_logger"] = __logger__ if start_time is not None: kwargs_["start_time"] = start_time if end_time is not None: kwargs_["end_time"] = end_time if life_cycle_observers is not None: kwargs_["life_cycle_observers"] = life_cycle_observers config = GraphConfiguration(**kwargs_, trace_back_depth=__trace_back_depth__, capture_values=__capture_values__) return evaluate_graph(graph, config, *args, **kwargs)