import sys
from datetime import timedelta, datetime
from typing import TypeVar, Generic, Tuple, Callable
from hgraph._types import (
TS,
TIME_SERIES_TYPE,
SIGNAL,
SCALAR,
TSB,
TimeSeriesSchema,
TSW,
WINDOW_SIZE,
WINDOW_SIZE_MIN,
TS_SCHEMA,
)
from hgraph._wiring._decorators import operator
__all__ = (
"to_window",
"sample",
"lag",
"schedule",
"resample",
"dedup",
"filter_",
"until_true",
"freeze",
"throttle",
"INT_OR_TIME_DELTA",
"take",
"drop",
"window",
"WindowResult",
"gate",
"batch",
"step",
"slice_",
"drop_dups",
)
INT_OR_TIME_DELTA = TypeVar("INT_OR_TIME_DELTA", int, timedelta)
[docs]
@operator
def sample(signal: SIGNAL, ts: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
"""
Snaps the value of a time series (ts) on a tick from another time series (signal).
"""
...
[docs]
@operator
def lag(ts: TIME_SERIES_TYPE, period: INT_OR_TIME_DELTA, on_wall_clock: bool = False) -> TIME_SERIES_TYPE:
"""
Delays the delivery of an input by the period specified. This period can either be a number of ticks
or a time-delta. In the case of a timedelta, on_wall_clock determines if the schedule is on the wall clock or
on engine time
When a time-delta is specified the value will be scheduled to be delivered at the receipt time + period.
"""
...
[docs]
@operator
def schedule(
delay: timedelta,
*,
start: datetime = None,
initial_delay: bool = True,
max_ticks: int = sys.maxsize,
use_wall_clock: bool = False,
) -> TS[bool]:
"""
Generates regular ticks in the graph that tick at the specified delay. For example,
``schedule(timedelta(seconds=3))`` will produce a time series of type TS[bool] that will tick True every three
seconds. The initial_delay parameter specifies whether the first tick should be delayed by the delay time or not and
max_ticks specifies the maximum number of ticks to produce. on_wall_clock indicates if the wall clock time is to
be used for the schedule. The default is to use engine time.
"""
...
[docs]
@operator
def resample(ts: TIME_SERIES_TYPE, period: timedelta) -> TIME_SERIES_TYPE:
"""
Resamples the time series to tick at the specified period. For example, ``resample(ts, timedelta(seconds=3))`` will
produce a time series of ts that ticks every three seconds. This will always tick at the specified delay, even if
the input time series does not tick.
"""
...
[docs]
@operator
def dedup(ts: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
"""
Drops duplicate values from a time-series.
"""
...
# For backwards compatibility. Prefer dedup
drop_dups = dedup
[docs]
@operator
def filter_(condition: TS[bool], ts: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
"""
Suppresses ticks of a time series when the condition time series' value is False
"""
...
@operator
def until_true(predicate: Callable[[SCALAR], bool], ts: TS[SCALAR]) -> TS[bool]:
"""
Emits False until ``predicate`` first returns True, then emits True and passivates ``ts``.
"""
...
@operator
def freeze(predicate: Callable[[SCALAR], bool], ts: TS[SCALAR], ) -> TS[SCALAR]:
"""
Forwards ``ts`` until ``predicate`` first returns True, then passivates ``ts`` so it is no longer evaluated.
"""
...
@operator
def filter_by(
ts: TIME_SERIES_TYPE, expr: Callable[[TIME_SERIES_TYPE, ...], bool], **kwargs: TSB[TS_SCHEMA]
) -> TIME_SERIES_TYPE:
"""
Filters the ``ts`` time-series using the expression provided.
If the expression requires additional inputs they can be supplied as keyword arguments.
:param ts: The time-series to filter.
:param expr: The expression used to filter the time-series.
:param kwargs: Any additional arguments to supply to the expression.
:return: The filtered time-series.
"""
...
[docs]
@operator
def throttle(
ts: TIME_SERIES_TYPE, period: TS[timedelta], delay_first_tick: bool = False, use_wall_clock: bool = False
) -> TIME_SERIES_TYPE:
"""
Reduces the rate of ticks in a time series to the given period. It works like ``resample`` if the rate of ticks is
higher than the period but unlike ``resample`` does not produce ticks when the source time series does not tick.
"""
...
[docs]
@operator
def take(ts: TIME_SERIES_TYPE, count: INT_OR_TIME_DELTA) -> TIME_SERIES_TYPE:
"""
filters out all ticks the input time series after ``count`` initial ticks or the given time series
"""
...
[docs]
@operator
def drop(ts: TIME_SERIES_TYPE, count: INT_OR_TIME_DELTA) -> TIME_SERIES_TYPE:
"""
Drops the first `count` ticks and then returns the remainder of the ticks
"""
...
[docs]
class WindowResult(TimeSeriesSchema, Generic[SCALAR]):
buffer: TS[tuple[SCALAR, ...]]
index: TS[tuple[datetime, ...]]
@operator(deprecated="Prefer to_window")
def window(ts: TS[SCALAR], period: INT_OR_TIME_DELTA, min_window_period: INT_OR_TIME_DELTA = None) -> TSB[WindowResult]:
"""
Buffers the time-series. Emits a tuple of values representing the elements in the buffer.
and a tuple of corresponding time-stamps representing the time-points at which the elements
in the buffer correspond.
When the window is an int, a cyclic buffer is created. If the window is a timedelta, then
a deque is used to buffer the elements.
Note with time-deltas the buffer will contain at most the elements that fit within the window, so
if you have 3 ticks at 1 microsecond intervals, and a window of 3 milliseconds, then the buffer will
not be full until the 4th tick.
"""
...
[docs]
@operator
def to_window(
ts: TS[SCALAR], period: INT_OR_TIME_DELTA, min_window_period: INT_OR_TIME_DELTA = None
) -> TSW[SCALAR, WINDOW_SIZE, WINDOW_SIZE_MIN]:
"""
Converts the time-series to a time-series window.
When the window is an int, a cyclic buffer is created. If the window is a timedelta, then
a deque is used to buffer the elements.
Note with time-deltas the buffer will contain at most the elements that fit within the window, so
if you have 3 ticks at 1 microsecond intervals, and a window of 3 milliseconds, then the buffer will
not be full until the 4th tick.
"""
...
[docs]
@operator
def gate(condition: TS[bool], ts: TIME_SERIES_TYPE, buffer_length: int = sys.maxsize) -> TIME_SERIES_TYPE:
"""
Queues up ticks of a time series when the value of ``condition`` is ``False``. Once it turns `True` the queued-up
ticks are released one by one. The default buffer length is sys.maxsize (which is roughly the same as unbounded).
A ``RuntimeError`` is raised if the buffer exceeds the given buffer_length (when set to a positive value).
Setting the length to -1 will put the gate in last tick mode. In this mode the values are overwritten whilst the
process is gating, once it has finished gating, the last modified value is released, and then ticks are processed
as usual.
"""
...
[docs]
@operator
def batch(condition: TS[bool], ts: TS[SCALAR], delay: timedelta, buffer_length: int) -> TS[Tuple[SCALAR, ...]]:
"""
Queues up ticks of a time series when the value of ``condition`` if ``False``. Once it turns `True` the queued up
ticks are released in batches with a given delay between each batch. A ``RuntimeError`` is raised if the buffer
exceeds the given buffer_length.
"""
...
[docs]
@operator
def step(ts: TIME_SERIES_TYPE, step_size: int) -> TIME_SERIES_TYPE:
"""
Steps the time series by the specified number of ticks. This will skip ticks in the time series.
"""
...
[docs]
@operator
def slice_(ts: TIME_SERIES_TYPE, start: INT_OR_TIME_DELTA, stop: INT_OR_TIME_DELTA, step_size: int) -> TIME_SERIES_TYPE:
"""
Slices the time series from the start to the stop index stepped by the step size. Essentially combines ``drop``,
``take`` and ``step`` into one operation. It works like python's slice operator but along the ticks or time axis.
"""
...