Graph Run Loop

The graph run loop consists of a configuration and a run loop function.

class EvaluationMode(*values)[source]

The mode to use when executing the graph.

The MODES are as follows:

REAL_TIME

The graph will run from start time until the graph is either requested to stop or is killed. If start time is in the past the graph will potentially replay historical PULL source nodes until it has caught up and will then continue from that point on once it catches up to the wall clock time it started. PUSH source nodes are only evaluated in REAL_TIME mode, these will only be started and evaluated in REAL_TIME mode. Events from PULL source nodes are only PULLED into the graph once we are caught up and are processed in a first come - first served basis. I.e. there is no cross node arrival time ordering of PUSH source nodes. Time ordering is managed within a PUSH source nodes output when the node is queueing, but the ordering is limited to when the received value is placed into the graph, not the time-delta between the events.

SIMULATION

The graph will run from start time to end time, all events to be processed are introduced from PULL source nodes and when there are no new events to process the graph is considered complete. In this mode the wall clock time (clock.now) is simulated and updated to be the next events time at the end of an evaluation cycle. PUSH source nodes are not supported in this mode of operation.

REAL_TIME = 0
SIMULATION = 1
class GraphConfiguration(run_mode: EvaluationMode = EvaluationMode.SIMULATION, start_time: datetime = datetime.datetime(1970, 1, 1, 0, 0), end_time: datetime = datetime.datetime(2299, 12, 31, 23, 59, 59, 999999), trace: bool | dict = False, profile: bool | dict = False, life_cycle_observers: tuple[~hgraph._runtime._evaluation_engine.EvaluationLifeCycleObserver, ...]=(), trace_wiring: bool | dict = False, wiring_observers: tuple[~hgraph._wiring._wiring_observer.WiringObserver, ...]=(), graph_logger: Logger = <factory>, trace_back_depth: int = 1, capture_values: bool = False, default_log_level: int = 10, logger_formatter: Callable = None, cleanup_on_error: bool = True)[source]

The configuration to be supplied to evaluate_graph. The following properties are defined:

run_mode

Either REAL_TIME or SIMULATION.

start_time

The first time to evaluate the engine for, this cannot be earlier than MIN_ST.

end_time

The last time to evaluate the engine for (inclusive), this cannot be later than MAX_ET.

trace

Turn on tracing by setting this to True. It is also possible to be selective with tracing by setting this to a dict of the form {"start": False, "stop": False, "eval": False}. Setting the value to be True will turn on tracing of this element, False will turn off tracing of the particular life-cycle. For more information on available options see: hgraph.test.EvaluationTrace

profile

Similar to tracing, except setting this will turn on profiling of the graph. See hgraph.test.EvaluationProfiler for more information as to the options.

life_cycle_observers

This allows for additional life-cycle observers to be registered. This should be supplied as a tuple of hgraph.EvaluationLifeCycleObserver instances.

trace_wiring

This indicates an interest in observing the wiring choices made during the wiring stage of the graph. As with tracing and profiling, a dictionary of options can also be supplied see hgraph.test.WiringTracer for more information on the options.

wiring_observers

This allows for custom wiring observers to be registered. This should be supplied as a tuple of hgraph.WiringObserver instances.

graph_logger

The instance of the Python Logger to use for graph logging, by default an instance of the logger will be setup and registered under the ‘hgraph’ name. (Can be retrieved using getLogger('hgraph'))

trace_back_depth

Used as a parameter to the error handling logic to determine the depth of traceback to capture.

capture_values

capture the values of the inputs to the trace-back (default is False)

default_log_level

The default log level to use, the default is DEBUG.

logger_formatter

Use to provide a custom formatter to override the default logger._log method. The node_path is supplied as an extra argument to the formatter as well as the original log method (as __orig_log__). An example formatter is provided as node_path_log_formatter.

capture_values: bool = False
cleanup_on_error: bool = True
default_log_level: int = 10
end_time: datetime = datetime.datetime(2299, 12, 31, 23, 59, 59, 999999)
property error_capture_options
graph_logger: Logger
life_cycle_observers: tuple[EvaluationLifeCycleObserver, ...] = ()
logger_formatter: Callable = None
profile: bool | dict = False
run_mode: EvaluationMode = 1
start_time: datetime = datetime.datetime(1970, 1, 1, 0, 0)
trace: bool | dict = False
trace_back_depth: int = 1
trace_wiring: bool | dict = False
wiring_observers: tuple[WiringObserver, ...] = ()
evaluate_graph(graph: Callable, config: GraphConfiguration, *args, **kwargs) list[tuple[datetime, Any]] | None[source]

Wires the graph supplied, constructs the evaluation engine using the configuration supplied and starts the engines’ run loop. If the graph has an ouput, this will collect the results in memory and return them as the end, once the run loop exists.

Parameters:
  • graph (Callable) – The graph to evaluate.

  • config (GraphConfiguration) – The configuration used to construct the evaluation engine.

  • args – Any arguments to supply to the graph.

  • kwargs – Any kwargs to supply to the graph.

Return type:

list[tuple[datetime, Any]] | None

Returns:

The list of engine time and value for each tick returned by the graph if an output is present, None otherwise.

For testing there is this dedicated run function that will allow for easy evaluation of a graph or node:

eval_node(node, *args, resolution_dict: [<class 'str'>, typing.Any]=None, __trace__: bool = False, __trace_wiring__: bool = False, __observers__: list[EvaluationLifeCycleObserver] = None, __elide__: bool = False, __start_time__: datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 1), __end_time__: datetime = None, **kwargs)[source]

Evaluates a node using the supplied arguments. This will construct a graph to wrap the node or graph supplied with logic to feed in the supplied inputs into the node and capture the results. The function will then run the graph and return the captured results.

Note

This only works with SIMULATION mode graphs.

The inputs to the node are lists of values (or None) that can be supplied to an output of the inputs’ type. If the node returns a result, the results will be collected and returned as an array. Results are captured as delta_value s.

For example:

@compute_node
def my_func(ts: TS[int]) -> TS[int]:
    return ts.value

assert eval_node(my_func, [1, 2, 3]) == [1, 2, 3]

The eval_node takes the graph or node to evaluate, and then any parameters to pass to the node. The parameters are supplied as a list of values. The node interprets the list as the values to tick into the node starting from MIN_ST and incrementing by MIN_TD for each entry. If the list has None as an element this is interpreted as the input not receiving a tick at that time.

The result returned are the ticks as they appeared, with None representing no value ticked at the time-point. The result will be padded to the last input time.

For nodes that require resolution, it is possible to supply a resolution dictionary to assist in resolving the correct types when setting up the replay nodes. This is an example using the resolution dictionary:

@compute_node
def my_func(ts: OUT) -> OUT:
    return ts.value

assert eval_node(my_func, [1, 2, 3], resolution_dict={OUT: TS[int]}) == [1, 2, 3]

There are a number of additional modifiers that can be supplied, these affect the calling of the run loop or the presentation of the results.

The most useful of these include __elide__ which, when set to True, will reduce the result to only the values that actually ticked. Note that this does not provide when the values ticked, just the order in which they ticked. This can be useful when the time between ticks is large.

Another useful option is __start_time__ which will allow the start time to be adjusted, this can be useful when performing a test that requires a particular start time.

Parameters:
  • node – The node to evaluate

  • args – Arguments to pass to the node

  • kwargs – Keyword arguments to pass to the node

  • resolution_dict ([<class ‘str’>, typing.Any]) – Dictionary of resolution keys and values to pass to the node (this should be at the input parameter level)

  • __trace__ – If True, the trace will be printed to the console.

  • __trace_wiring__ – If True, the wiring trace will be printed to the console.

  • __observers__ – If not None, the observers will be added to the evaluation results.

  • __elide__ – If True, only the ticked values will be returned. If the value is False every potential engine cycle will have a result (None if it did not tick).

  • __start_time__ – If not None, the time at which to start evaluation.

  • __end_time__ – If not None, the time at which to end evaluation.

The graph supports observing the state transitions, this allows for some useful utilities to be added, the extensions must implement the EvaluationLifeCycleObserver interface shown below:

class EvaluationLifeCycleObserver[source]

Provide the callbacks that can be received during the evaluation of the graph. Use this with care as each additional life-cycle observer will slow down the evaluation of the graph.

on_after_graph_evaluation(graph: Graph)[source]

Called after the graph is evaluated.

on_after_graph_push_nodes_evaluation(graph: Graph)[source]

Called after the graph has evaluated all its push nodes.

on_after_node_evaluation(node: Node)[source]

Called after a node is evaluated.

on_after_start_graph(graph: Graph)[source]

Called after the graph is started.

on_after_start_node(node: Node)[source]

Called after a node is started.

on_after_stop_graph(graph: Graph)[source]

Called after the graph is stopped.

on_after_stop_node(node: Node)[source]

Called after a node is stopped.

on_before_graph_evaluation(graph: Graph)[source]

Called before the graph is evaluated.

on_before_node_evaluation(node: Node)[source]

Called before a node is evaluated.

on_before_start_graph(graph: Graph)[source]

Called before the graph is started.

on_before_start_node(node: Node)[source]

Called before a node is started.

on_before_stop_graph(graph: Graph)[source]

Called before the graph is stopped.

on_before_stop_node(node: Node)[source]

Called before a node is stopped.

class EvaluationTrace(filter: str = None, start: bool = True, eval: bool = True, stop: bool = True, node: bool = True, graph: bool = True)[source]

Bases: EvaluationLifeCycleObserver

Logs out the different steps as the engine evaluates the graph. This is voluminous but can be helpful tracing down unexpected behaviour.

filter

Used to restrict which node and graph events to report using a simple containment check of the node and its path.

start

Log start related events.

eval

Log eval related events.

stop

Log stop related events

node

Log node related events

graph

Log graph related events

static set_print_all_values(value: bool)[source]

To see all values (not only ticked ones) in the trace set this to True.

static set_use_logger(value: bool)[source]

To use print instead of the logger set this to False.

on_after_graph_evaluation(graph: Graph)[source]

Called after the graph is evaluated.

on_after_node_evaluation(node: Node)[source]

Called after a node is evaluated.

on_after_start_graph(graph: Graph)[source]

Called after the graph is started.

on_after_start_node(node: Node)[source]

Called after a node is started.

on_after_stop_graph(graph: Graph)[source]

Called after the graph is stopped.

on_after_stop_node(node: Node)[source]

Called after a node is stopped.

on_before_graph_evaluation(graph: Graph)[source]

Called before the graph is evaluated.

on_before_node_evaluation(node: Node)[source]

Called before a node is evaluated.

on_before_start_graph(graph: Graph)[source]

Called before the graph is started.

on_before_start_node(node: Node)[source]

Called before a node is started.

on_before_stop_graph(graph: Graph)[source]

Called before the graph is stopped.

on_before_stop_node(node: Node)[source]

Called before a node is stopped.

class EvaluationProfiler(start: bool = True, eval: bool = True, stop: bool = True, node: bool = True, graph: bool = True)[source]

Bases: EvaluationLifeCycleObserver

Prints out some useful metrics of the running graph, this can be used to help trace down memory leaks. Takes the following inputs to configure:

start

Log start related events.

eval

Log eval related events.

stop

Log stop related events

node

Log node related events

graph

Log graph related events

on_after_graph_evaluation(graph: Graph)[source]

Called after the graph is evaluated.

on_after_node_evaluation(node: Node)[source]

Called after a node is evaluated.

on_after_start_graph(graph: Graph)[source]

Called after the graph is started.

on_after_start_node(node: Node)[source]

Called after a node is started.

on_after_stop_graph(graph: Graph)[source]

Called after the graph is stopped.

on_after_stop_node(node: Node)[source]

Called after a node is stopped.

on_before_graph_evaluation(graph: Graph)[source]

Called before the graph is evaluated.

on_before_node_evaluation(node: Node)[source]

Called before a node is evaluated.

on_before_start_graph(graph: Graph)[source]

Called before the graph is started.

on_before_start_node(node: Node)[source]

Called before a node is started.

on_before_stop_graph(graph: Graph)[source]

Called before the graph is stopped.

on_before_stop_node(node: Node)[source]

Called before a node is stopped.

Another useful observer is the wiring observer, this allows to hook into the wiring process and see what it is doing. This is achieved by implementing:

class WiringObserver[source]

Observer for wiring events. When installed in the wiring engine, the observer will receive notifications from wiring process

on_enter_graph_wiring(signature: WiringNodeSignature)[source]
on_enter_nested_graph_wiring(signature: WiringNodeSignature)[source]
on_enter_node_wiring(signature: WiringNodeSignature)[source]
on_exit_graph_wiring(signature: WiringNodeSignature, error)[source]
on_exit_nested_graph_wiring(signature: WiringNodeSignature, error)[source]
on_exit_node_wiring(signature: WiringNodeSignature, error)[source]
on_overload_resolution(signature: WiringNodeSignature, selected_overload, rejected_overloads, ambiguous_overloads)[source]
class WiringTracer(filter: str = None, graph: bool = True, node: bool = True)[source]

Bases: WiringObserver

Prints out details of wiring resolutions performed whilst building the graph. This is helpful to work out why an overload may not be resolving as expected.

The tracer takes the following parameters:

filter

A simple containment check of the filter string with the wiring path name.

graph

Log graph level information

node

Log node level information

on_enter_graph_wiring(signature: WiringNodeSignature)[source]
on_enter_nested_graph_wiring(signature: WiringNodeSignature)[source]
on_enter_node_wiring(signature: WiringNodeSignature)[source]
on_exit_graph_wiring(signature: WiringNodeSignature, error)[source]
on_exit_nested_graph_wiring(signature: WiringNodeSignature, error)[source]
on_exit_node_wiring(signature: WiringNodeSignature, error)[source]
on_overload_resolution(signature: WiringNodeSignature, selected_overload, rejected_overloads, ambiguous_overloads)[source]