Dynamic Graphs

map_(func: Callable, args: TSB[TS_SCHEMA], kwargs: TSB[TS_SCHEMA_1]) TIME_SERIES_TYPE[source]

Apply a node or lambda element-wise over multiplexed time-series inputs (TSD or TSL).

map_ demultiplexes its inputs, calls func once per key/index, and collects the results into a multiplexed output. The types of func’s parameters are inferred at wiring time from the demultiplexed components of the supplied inputs.

Parameters:
  • func (Callable[..., Any]) – A @graph/@compute_node decorated function or a lambda. When a TSD input is supplied, func receives (key, value, ...) where key is the dictionary key and value is the per-key time-series. When a TSL input is supplied, func receives the per-index time-series.

  • args (TimeSeriesBundleInput[TypeVar(TS_SCHEMA, bound= TimeSeriesSchema)]) – Time-series inputs to demultiplex and pass to func.

  • __label__ – Optional label for debugging/tracing.

  • kwargs (TimeSeriesBundleInput[TypeVar(TS_SCHEMA_1, bound= TimeSeriesSchema)]) – Named time-series inputs to demultiplex and pass to func.

Return type:

TypeVar(TIME_SERIES_TYPE, bound= TimeSeries)

Returns:

A WiringPort representing the multiplexed output.

TSD extensions:

  • __keys__: TSS[SCALAR] — explicit key set for demultiplexing.

  • __key_arg__: str = 'key' — name of the parameter that receives the key.

  • Wrap an input with no_key() to exclude it from key-set inference.

  • Wrap an input with pass_through() to pass it directly without demultiplexing.

Example — map over a TSD:

config: TSD[str, TSB[Config]] = ...

# func receives (key: TS[str], c: TSB[Config]) per entry
map_(lambda key, c: publish_multitable("data", key, random_values(c)), config)

Example — map a decorated node:

@compute_node
def process(key: TS[str], value: TS[float]) -> TS[float]: ...

map_(process, my_tsd)

See also: ../concepts/tsd_map_node

reduce(func: ~typing.Callable[[~hgraph._types._time_series_types.TIME_SERIES_TYPE, ~hgraph._types._time_series_types.TIME_SERIES_TYPE_1], ~hgraph._types._time_series_types.TIME_SERIES_TYPE], ts: ~hgraph._types._tsd_type.TimeSeriesDictInput[~hgraph._types._typing_utils.K, ~hgraph._types._time_series_types.TIME_SERIES_TYPE_1] | ~hgraph._types._tsl_type.TimeSeriesListInput[~hgraph._types._time_series_types.TIME_SERIES_TYPE_1, ~hgraph._types._scalar_types.SIZE], zero: ~hgraph._types._time_series_types.TIME_SERIES_TYPE = <object object>, is_associative: bool = True) TIME_SERIES_TYPE[source]

Reduce the input time-series collection into a single time-series value. The zero must-be compatible with the TIME_SERIES_TYPE value and be constructable as const(zero, TIME_SERIES_TYPE). If the function is associative, then TIME_SERIES_TYPE must be the same as TIME_SERIES_TYPE_1. When the function is associative, the ‘reduce’ will perform a tree reduction, otherwise it will perform a linear reduction. The tree reduction is much faster on change.

By definition, the reduce function over a TSD must be commutative and associative in the sense that the order of the inputs are not guaranteed. Only a TSL supports non-commutative reduce functions.

Example [TSD]:

tsd: TSD[str, TS[int]] = ...
out = reduce(add_, tsd, 0)
>> tsd <- {'a': [1], 'b': [4], 'c': [7]}
>> out -> 12

Example [TSL]:

tsl: TSL[TS[int], SIZE] = ...
out = reduce(add_, tsl, 0)
>> tsl <- ([1], [2], [3], [4], [5])
>> out -> 15

Example [TS[tuple[SCALAR, …]]:

ts: TS[tuple[int, ...]] = ...
initial_value: TS[str] = ...
out = reduce(lambda x, y: format_("{}, {}", x, y), ts, initial_value)
Return type:

TypeVar(TIME_SERIES_TYPE, bound= TimeSeries)

NOTE: TSD[int, TIME_SERIES_TYPE_1] with is_associative=False is the only TSD non-associative reduce supported.

The expectation is that the integer values represent a uniform list from 0 to size-1. There cannot be holes in the sequence. Removals of keys must be of the form [0:n] where n is the last element in the new set. This allows for processing an input that is different from the output type, as for the tuple example. The zero element is used as the default result if no value is supplied, it is also used as the input for the chain of keys. The output of the n-1th element is used as the input to the nth element lhs. The values from the dict are used as the rhs input.

switch_(key: TimeSeriesValueInput[SCALAR], cases: dict[SCALAR, Callable], *args, reload_on_ticked: bool = False, **kwargs) TIME_SERIES_TYPE | None[source]

The ability to select and instantiate a graph based on the value of the key time-series input. By default, when the key changes, a new instance of graph is created and run. The graph will be evaluated when it is initially created and then as the values are ticked as per normal. If the code depends on inputs to have ticked, they will only be evaluated when the inputs next tick (unless they have ticked when the graph is wired in).

The selector is part of the graph shaping operators. This allows for different shapes that operate on the same inputs and return the same output. An example of using this is when you have different order types, and then you dynamically choose which graph to evaluate based on the order type.

This node has the overhead of instantiating and tearing down the subgraphs as the key changes. The use of switch should consider this overhead, the positive side is that once the graph is instantiated, the performance is the same as if it were wired in directly. This is great when the key changes infrequently.

The mapped graphs / nodes can have a first argument which is of the same type as the key. In this case, the key will be mapped into this argument. If the first argument is not of the same type as the key, or the kwargs match the argument name of the first argument, the key will not be mapped into the argument.

Example:

key: TS[str] = ...
ts1: TS[int] = ...
ts2: TS[int] = ...
out = switch(key, {'add': add_, "sub": sub_}, ts1, ts2)

Which will perform the computation based on the key’s value.

A default option can be provided by using the DEFAULT marker. For example:

out = switch_(key, {DEFAULT: add_}, ...)
Return type:

Optional[TypeVar(TIME_SERIES_TYPE, bound= TimeSeries)]

mesh_(func: Callable, *args, **kwargs)[source]

Wrap the given graph into a calculation mesh - a structure that is akin to a map_ but allows instances of the graph to access outputs of other instances of the graph. New instances will also be created on demand from inner graphs as well as from the keys’ inputs.