Injectables

Here are the supported injectables.

class InjectableTypesEnum(*values)[source]
CLOCK = 16

Needs to have access to the current engine clock for it’s logic

ENGINE_API = 32

Needs access to the engine API, typically start and end times for the engine

LOGGER = 64

This node produces logging output

NODE = 128

Requires access to the node, this should generally be reserved for use in framework nodes

OUTPUT = 8

Access to the output is required

SCHEDULER = 4

This node will schedule itself outside of its defined inputs for evaluation

STATE = 1

This node is stateful, provide a state instance here

class STATE(__schema__: type[COMPOUND_SCALAR] = None, **kwargs)[source]

Used in a compute_node or sink_node to indicate that the function requires state to be injected into the function. This is used as follows:

@dataclass
class MyStateSchema(CompoundScalar):
    p1: str = "Initial Value"

@compute_node
def my_node(ts: TIME_SERIES_VALUE, ..., _state: STATE[MyStateSchema] = None) -> OUT:
    ...

The state can be used in two ways. The first is naked, i.e. _state: STATE = None, in this case the state is effectively an attribute dictionary.

The other way is as shown above, where a dataclass (CompoundScalar) is used to describe the schema for the state. This allows for state initialization to be performed without having to create a start for the function.

The state can be accessed using the attribute name, for example:

_state.p1 = "New Value"

alternatively, it is possible to use the __getitem__ syntax, for example:

a = _state["p1"]

The state also works as a dictionary, with methods such as keys, items and values available.

is_updated() bool[source]

Has the state been updated since last reset / created

Return type:

bool

items() ItemsView[str, SCALAR][source]

The items of the bundle

Return type:

ItemsView[str, TypeVar(SCALAR, bound= object)]

keys() KeysView[str][source]

The keys of the schema defining the bundle

Return type:

KeysView[str]

reset_updated() None[source]

Resets the updated state back to false

Return type:

None

values() ValuesView[SCALAR][source]

The values of the bundle

Return type:

ValuesView[TypeVar(SCALAR, bound= object)]

property as_schema: COMPOUND_SCALAR

Exposes the TSB as the schema type. This is useful for type completion in tools such as PyCharm / VSCode. It is a convenience method, it is possible to access the properties of the schema directly from the TSB instances as well.

class SCHEDULER[source]

An input that is scheduled to be evaluated at a particular time. This is used for time-series inputs that are not bound to an output, but are still required to be evaluated at a particular time.

abstractmethod has_tag(tag: str) bool[source]

Does this scheduler have the tag specified.

Return type:

bool

abstractmethod pop_tag(tag: str, default: datetime = None) datetime | None[source]

Removes the tag and returns the value associated to it. If the tag is not found, then the default value is returned.

Return type:

datetime | None

abstractmethod reset()[source]

Remove all scheduled events.

abstractmethod schedule(when: datetime | timedelta, tag: str = None, on_wall_clock: bool = False)[source]

Schedule the node to be evaluated at the time specified. If tag is set, then the scheduled event will be associated to the tag, if a schedule is already set against the tag, it will be replaced with the new entry.

abstractmethod un_schedule(tag: str = None)[source]

If tag is set, this will remove the scheduled event associated with this tag, if there is nothing scheduled for the tag, nothing is done. If the tag is not set, then remove the next scheduled item.

abstract property is_scheduled: bool

Are there any pending scheduling events.

abstract property is_scheduled_now: bool

Was this scheduled in this engine cycle. That is, was the node scheduled for evaluation at the current engine_time.

abstract property next_scheduled_time: datetime

The time that this input is scheduled to be evaluated.

LOGGER

alias of Logger

class EvaluationClock[source]

The evaluation clock provides a view on time in the currently evaluating graph. Time in the graph is dependent on the mode of execution. In simulation mode, the clock presented is simulated to allow for processing historical data in rapid time. In realtime mode, the clock represent the system clock (time is always presented in UTC). By using the clock abstraction to query time-related information it is possible to ensure that logic can be written independently of how the system chooses to move through time.

abstract property cycle_time: timedelta

The amount of time spent in computation since the beginning of the evaluation of the graph till the point where this property is called.

abstract property evaluation_time: datetime

The time of the source event initiating this evaluation cycle. This time remains the same for each node processed until the graph completes one complete step through the nodes in the graph.

abstract property next_cycle_evaluation_time: datetime

The next smallest evaluation time that can be scheduled. This is a convenience method and is equivalent to evaluation_time + MIN_TD

abstract property now: datetime

This represents the current time, in a real time engine, this will represent the system clock. In simulation mode this will represent the evaluation_time + lag.

class EvaluationEngineApi[source]

The user visible API for the evaluation engine.

abstractmethod add_after_evaluation_notification(fn: callable)[source]

Add an after evaluation notification observer. The notification is called once after the evaluation of the current cycle.

abstractmethod add_before_evaluation_notification(fn: callable)[source]

Add a before evaluation notification observer. The notification is called once before the next evaluation cycle.

abstractmethod add_life_cycle_observer(observer: EvaluationLifeCycleObserver)[source]

Add a graph engine life-cycle observer. Life cycle events will immediately start to be delivered to the observer. The observer will continue to receive events until it is removed.

abstractmethod remove_life_cycle_observer(observer: EvaluationLifeCycleObserver)[source]

Remove the provided life-cycle observer from the engine. This is immediately effective.

abstractmethod request_engine_stop()[source]

Request the evaluation engine to stop processing events and exit. This will not stop the graph immediately, and will only be processed after the current evaluation cycle has completed.

abstract property end_time: datetime

The end time of the evaluation engine.

abstract property evaluation_clock: EvaluationClock

The evaluation clock.

abstract property evaluation_mode: EvaluationMode

The current mode of evaluation

abstract property is_stop_requested: bool

Returns True if the engine has been requested to stop.

abstract property start_time: datetime

The start time of the evaluation engine.