Graph Run Loop
The graph run loop consists of a configuration and a run loop function.
- class EvaluationMode(*values)[source]
The mode to use when executing the graph.
The MODES are as follows:
- REAL_TIME
The graph will run from start time until the graph is either requested to stop or is killed. If start time is in the past the graph will potentially replay historical PULL source nodes until it has caught up and will then continue from that point on once it catches up to the wall clock time it started. PUSH source nodes are only evaluated in REAL_TIME mode, these will only be started and evaluated in REAL_TIME mode. Events from PULL source nodes are only PULLED into the graph once we are caught up and are processed in a first come - first served basis. I.e. there is no cross node arrival time ordering of PUSH source nodes. Time ordering is managed within a PUSH source nodes output when the node is queueing, but the ordering is limited to when the received value is placed into the graph, not the time-delta between the events.
- SIMULATION
The graph will run from start time to end time, all events to be processed are introduced from PULL source nodes and when there are no new events to process the graph is considered complete. In this mode the wall clock time (clock.now) is simulated and updated to be the next events time at the end of an evaluation cycle. PUSH source nodes are not supported in this mode of operation.
- REAL_TIME = 0
- SIMULATION = 1
- class GraphConfiguration(run_mode: EvaluationMode = EvaluationMode.SIMULATION, start_time: datetime = datetime.datetime(1970, 1, 1, 0, 0), end_time: datetime = datetime.datetime(2299, 12, 31, 23, 59, 59, 999999), trace: bool | dict = False, profile: bool | dict = False, life_cycle_observers: tuple[~hgraph._runtime._evaluation_engine.EvaluationLifeCycleObserver, ...]=(), trace_wiring: bool | dict = False, wiring_observers: tuple[~hgraph._wiring._wiring_observer.WiringObserver, ...]=(), graph_logger: Logger = <factory>, trace_back_depth: int = 1, capture_values: bool = False, default_log_level: int = 10, logger_formatter: Callable = None, cleanup_on_error: bool = True)[source]
The configuration to be supplied to
evaluate_graph. The following properties are defined:- run_mode
Either
REAL_TIMEorSIMULATION.- start_time
The first time to evaluate the engine for, this cannot be earlier than MIN_ST.
- end_time
The last time to evaluate the engine for (inclusive), this cannot be later than MAX_ET.
- trace
Turn on tracing by setting this to
True. It is also possible to be selective with tracing by setting this to a dict of the form{"start": False, "stop": False, "eval": False}. Setting the value to beTruewill turn on tracing of this element,Falsewill turn off tracing of the particular life-cycle. For more information on available options see:hgraph.test.EvaluationTrace- profile
Similar to tracing, except setting this will turn on profiling of the graph. See
hgraph.test.EvaluationProfilerfor more information as to the options.- life_cycle_observers
This allows for additional life-cycle observers to be registered. This should be supplied as a tuple of
hgraph.EvaluationLifeCycleObserverinstances.- trace_wiring
This indicates an interest in observing the wiring choices made during the wiring stage of the graph. As with tracing and profiling, a dictionary of options can also be supplied see
hgraph.test.WiringTracerfor more information on the options.- wiring_observers
This allows for custom wiring observers to be registered. This should be supplied as a tuple of
hgraph.WiringObserverinstances.- graph_logger
The instance of the Python
Loggerto use for graph logging, by default an instance of the logger will be setup and registered under the ‘hgraph’ name. (Can be retrieved usinggetLogger('hgraph'))- trace_back_depth
Used as a parameter to the error handling logic to determine the depth of traceback to capture.
- capture_values
capture the values of the inputs to the trace-back (default is False)
- default_log_level
The default log level to use, the default is DEBUG.
- logger_formatter
Use to provide a custom formatter to override the default logger._log method. The node_path is supplied as an extra argument to the formatter as well as the original log method (as
__orig_log__). An example formatter is provided asnode_path_log_formatter.
- property error_capture_options
- life_cycle_observers: tuple[EvaluationLifeCycleObserver, ...] = ()
- run_mode: EvaluationMode = 1
- wiring_observers: tuple[WiringObserver, ...] = ()
- evaluate_graph(graph: Callable, config: GraphConfiguration, *args, **kwargs) list[tuple[datetime, Any]] | None[source]
Wires the
graphsupplied, constructs the evaluation engine using the configuration supplied and starts the engines’ run loop. If thegraphhas an ouput, this will collect the results in memory and return them as the end, once the run loop exists.- Parameters:
graph (
Callable) – The graph to evaluate.config (
GraphConfiguration) – The configuration used to construct the evaluation engine.args – Any arguments to supply to the graph.
kwargs – Any kwargs to supply to the graph.
- Return type:
- Returns:
The list of engine time and value for each tick returned by the graph if an output is present, None otherwise.
For testing there is this dedicated run function that will allow for easy evaluation of a graph or node:
- eval_node(node, *args, resolution_dict: [<class 'str'>, typing.Any]=None, __trace__: bool = False, __trace_wiring__: bool = False, __observers__: list[EvaluationLifeCycleObserver] = None, __elide__: bool = False, __start_time__: datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 1), __end_time__: datetime = None, **kwargs)[source]
Evaluates a node using the supplied arguments. This will construct a graph to wrap the node or graph supplied with logic to feed in the supplied inputs into the node and capture the results. The function will then run the graph and return the captured results.
Note
This only works with SIMULATION mode graphs.
The inputs to the node are lists of values (or None) that can be supplied to an output of the inputs’ type. If the node returns a result, the results will be collected and returned as an array. Results are captured as
delta_values.For example:
@compute_node def my_func(ts: TS[int]) -> TS[int]: return ts.value assert eval_node(my_func, [1, 2, 3]) == [1, 2, 3]
The
eval_nodetakes thegraphor node to evaluate, and then any parameters to pass to the node. The parameters are supplied as a list of values. The node interprets the list as the values to tick into the node starting fromMIN_STand incrementing byMIN_TDfor each entry. If the list hasNoneas an element this is interpreted as the input not receiving a tick at that time.The result returned are the ticks as they appeared, with
Nonerepresenting no value ticked at the time-point. The result will be padded to the last input time.For nodes that require resolution, it is possible to supply a resolution dictionary to assist in resolving the correct types when setting up the replay nodes. This is an example using the resolution dictionary:
@compute_node def my_func(ts: OUT) -> OUT: return ts.value assert eval_node(my_func, [1, 2, 3], resolution_dict={OUT: TS[int]}) == [1, 2, 3]
There are a number of additional modifiers that can be supplied, these affect the calling of the run loop or the presentation of the results.
The most useful of these include
__elide__which, when set toTrue, will reduce the result to only the values that actually ticked. Note that this does not provide when the values ticked, just the order in which they ticked. This can be useful when the time between ticks is large.Another useful option is
__start_time__which will allow the start time to be adjusted, this can be useful when performing a test that requires a particular start time.- Parameters:
node – The node to evaluate
args – Arguments to pass to the node
kwargs – Keyword arguments to pass to the node
resolution_dict ([<class ‘str’>, typing.Any]) – Dictionary of resolution keys and values to pass to the node (this should be at the input parameter level)
__trace__ – If True, the trace will be printed to the console.
__trace_wiring__ – If True, the wiring trace will be printed to the console.
__observers__ – If not None, the observers will be added to the evaluation results.
__elide__ – If True, only the ticked values will be returned. If the value is False every potential engine cycle will have a result (None if it did not tick).
__start_time__ – If not None, the time at which to start evaluation.
__end_time__ – If not None, the time at which to end evaluation.
The graph supports observing the state transitions, this allows for some useful utilities to be added, the extensions
must implement the EvaluationLifeCycleObserver interface shown below:
- class EvaluationLifeCycleObserver[source]
Provide the callbacks that can be received during the evaluation of the graph. Use this with care as each additional life-cycle observer will slow down the evaluation of the graph.
- class EvaluationTrace(filter: str = None, start: bool = True, eval: bool = True, stop: bool = True, node: bool = True, graph: bool = True)[source]
Bases:
EvaluationLifeCycleObserverLogs out the different steps as the engine evaluates the graph. This is voluminous but can be helpful tracing down unexpected behaviour.
- filter
Used to restrict which node and graph events to report using a simple containment check of the node and its path.
- start
Log start related events.
- eval
Log eval related events.
- stop
Log stop related events
- node
Log node related events
- graph
Log graph related events
- class EvaluationProfiler(start: bool = True, eval: bool = True, stop: bool = True, node: bool = True, graph: bool = True)[source]
Bases:
EvaluationLifeCycleObserverPrints out some useful metrics of the running graph, this can be used to help trace down memory leaks. Takes the following inputs to configure:
- start
Log start related events.
- eval
Log eval related events.
- stop
Log stop related events
- node
Log node related events
- graph
Log graph related events
Another useful observer is the wiring observer, this allows to hook into the wiring process and see what it is doing. This is achieved by implementing:
- class WiringObserver[source]
Observer for wiring events. When installed in the wiring engine, the observer will receive notifications from wiring process
- class WiringTracer(filter: str = None, graph: bool = True, node: bool = True)[source]
Bases:
WiringObserverPrints out details of wiring resolutions performed whilst building the graph. This is helpful to work out why an overload may not be resolving as expected.
The tracer takes the following parameters:
- filter
A simple containment check of the filter string with the wiring path name.
- graph
Log graph level information
- node
Log node level information