Source code for hgraph._operators._time_series_conversion

from datetime import timedelta

from hgraph._types._ts_type import TS
from hgraph._types._scalar_types import DEFAULT, SCALAR
from hgraph._types._time_series_types import OUT
from hgraph._types._time_series_types import TIME_SERIES_TYPE
from hgraph._wiring._decorators import operator

__all__ = ("const", "convert", "combine", "collect", "emit")


[docs] @operator def const( value: SCALAR, tp: type[OUT] = TS[SCALAR], delay: timedelta = timedelta(), ) -> DEFAULT[OUT]: """ Produces a single tick at the start of the graph evaluation after which this node does nothing. :param value: The value in appropriate form to be applied to the time-series type specified in tp. :param tp: Used to resolve the correct type for the output, by default this is TS[SCALAR] where SCALAR is the type of the value. :param delay: The amount of time to delay the value by. The default is 0. :return: A single tick of the value supplied. """ ...
[docs] @operator def convert(ts: TIME_SERIES_TYPE, to: type[OUT] = DEFAULT[OUT], **kwargs) -> OUT: """ Converts the incoming time series to the desired result type. This can be called in one of two ways: :: c = const(..., TS[set[str]]) convert(c, TS[tuple[str, ...]]) or: :: convert[TS[tuple[str, ...]]](c) """ ...
[docs] @operator def combine(*args: TIME_SERIES_TYPE, **kwargs) -> DEFAULT[OUT]: """ Combines the incoming time series into one collection time-series output. """ ...
[docs] @operator def collect(ts: TIME_SERIES_TYPE) -> DEFAULT[OUT]: """ Converts the `ts` value to a collection time-series. The time-series to convert to must be provided declaratively. This is done by setting the OUT to the desired result type, for example: :: ts = const(1) r = collect[OUT: TS[tuple[int, ...]](ts) Where here we set the resultant type to be a time-series of tuples, the input would be a time-series of integers. For processing a result type of `dict` or `TSD` the input must consist of a time-series of two time-series values where one represents the keys and the other represent the values. For example: :: ts = const(frozendict({"a": 1, "b": 2}), TSB[ts_schema(key=TS[str], value=TS[int])]) r = collect[OUT: TS[Mapping[str, int]](ts) Alternative syntax includes: :: key = const("a") value = const(1) r = collect[OUT: TS[Mapping[str, int]](key=key, value=value) Or options such as the input being a ``TSL[TS[SCALAR], Size[2]]`` (in this case the key and value must be the same). It is also possible to accept a TS[Mapping[str, int]] or a TS[tuple[str, int]] """ raise NotImplemented(f"Not implemented for input type {type(ts)}")
[docs] @operator def emit(ts: TIME_SERIES_TYPE) -> DEFAULT[OUT]: """ Accepts a collection representation, for example: ``TS[tuple[int, ...]]`` and returns a time-series of the values as a stream of individual ticks (in the example above that would be ``TS[int]``. """ ...