from enum import Enum, auto
from typing import Type
from hgraph._types import OUT, TIME_SERIES_TYPE, TS, SCALAR, TIME_SERIES_TYPE_1, TIME_SERIES_TYPE_2
from hgraph._types._scalar_types import Size, SIZE, DEFAULT
from hgraph._types._tsl_type import TSL
from hgraph._wiring._decorators import operator, graph
from hgraph._wiring._wiring_node_class._wiring_node_class import WiringNodeClass
from hgraph._wiring._wiring_port import WiringPort
__all__ = (
"CmpResult",
"DivideByZero",
"abs_",
"accumulate",
"add_",
"and_",
"average",
"bit_and",
"bit_or",
"bit_xor",
"contains_",
"cmp_",
"difference",
"div_",
"divmod_",
"eq_",
"floordiv_",
"ge_",
"getattr_",
"getitem_",
"gt_",
"index_of",
"intersection",
"invert_",
"is_empty",
"le_",
"len_",
"ln",
"lshift_",
"lt_",
"max_",
"mean",
"min_",
"mod_",
"mul_",
"ne_",
"neg_",
"not_",
"or_",
"pos_",
"pow_",
"rshift_",
"setattr_",
"std",
"str_",
"sub_",
"sum_",
"symmetric_difference",
"type_",
"union",
"var",
"zero",
"sign",
)
[docs]
@operator
def add_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE_1) -> DEFAULT[OUT]:
"""
This represents the ``+`` operator for time series types.
"""
...
WiringPort.__add__ = lambda x, y: add_(x, y)
WiringPort.__radd__ = lambda x, y: add_(y, x)
[docs]
@operator
def sub_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE_1) -> DEFAULT[OUT]:
"""
This represents the ``-`` operator for time series types.
"""
...
WiringPort.__sub__ = lambda x, y: sub_(x, y)
WiringPort.__rsub__ = lambda x, y: sub_(y, x)
[docs]
@operator
def mul_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE, **kwargs) -> TIME_SERIES_TYPE:
"""
This represents the ``*`` operator for time series types.
Parameters:
__strict__: bool: (default False)
* if True will return nothing if either lhs or rhs is not valid or present
* if False will return lhs or rhs if rhs or lhs respectively is not present
"""
...
WiringPort.__mul__ = lambda x, y: mul_(x, y)
WiringPort.__rmul__ = lambda x, y: mul_(y, x)
[docs]
class DivideByZero(Enum):
"""For numeric division set the divide_by_zero property"""
ERROR = auto()
NAN = auto()
INF = auto()
NONE = auto()
ZERO = auto()
ONE = auto()
[docs]
@operator
def div_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE, **kwargs) -> DEFAULT[OUT]:
"""
This represents the `/` operator for time series types.
Parameters:
* divide_by_zero: DivideByZero - controls the behaviour when dividing by zero
"""
...
WiringPort.__truediv__ = lambda x, y: div_(x, y)
WiringPort.__rtruediv__ = lambda x, y: div_(y, x)
[docs]
@operator
def floordiv_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE, **kwargs) -> TIME_SERIES_TYPE:
"""
This represents the `//` operator for time series types.
"""
...
WiringPort.__floordiv__ = lambda x, y: floordiv_(x, y)
WiringPort.__rfloordiv__ = lambda x, y: floordiv_(y, x)
[docs]
@operator
def mod_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
"""
This represents the `%` operator for time series types.
"""
...
WiringPort.__mod__ = lambda x, y: mod_(x, y)
WiringPort.__rmod__ = lambda x, y: mod_(y, x)
[docs]
@operator
def divmod_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TSL[TIME_SERIES_TYPE, Size[2]]:
"""
This represents the `divmod` operator for time series types.
(This is defined in Python as the integer division with remainder, i.e. divmod(9, 4) == (2, 1))
"""
...
WiringPort.__divmod__ = lambda x, y: divmod_(x, y)
WiringPort.__rdivmod__ = lambda x, y: divmod_(y, x)
[docs]
@operator
def pow_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
"""
This represents the `**` operator for time series types.
Params:
* divide_by_zero: DivideByZero - controls the behaviour when dividing by zero
"""
...
WiringPort.__pow__ = lambda x, y: pow_(x, y)
WiringPort.__rpow__ = lambda x, y: pow_(y, x)
[docs]
@operator
def lshift_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
"""
This represents the ``<<`` operator for time series types.
"""
...
WiringPort.__lshift__ = lambda x, y: lshift_(x, y)
WiringPort.__rlshift__ = lambda x, y: lshift_(y, x)
[docs]
@operator
def rshift_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
"""
This represents the ``>>`` operator for time series types.
"""
...
WiringPort.__rshift__ = lambda x, y: rshift_(x, y)
WiringPort.__rrshift__ = lambda x, y: rshift_(y, x)
[docs]
@operator
def bit_and(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> DEFAULT[OUT]:
"""
This represents the ``&`` operator for time series types.
"""
...
WiringPort.__and__ = lambda x, y: bit_and(x, y)
WiringPort.__rand__ = lambda x, y: bit_and(y, x)
@operator
def and_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TS[bool]:
"""
This represents the ``and`` operator for time series types.
This operator does not substitute ``and`` (since that is not possible in Python), but can be used as a functional
equivalent for ``and``.
"""
...
[docs]
@operator
def bit_or(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TS[TIME_SERIES_TYPE_1]:
"""
This represents the ``|`` operator for time series types.
"""
...
WiringPort.__or__ = lambda x, y: bit_or(x, y)
WiringPort.__ror__ = lambda x, y: bit_or(y, x)
[docs]
@operator
def or_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TS[bool]:
"""
This represents the ``or`` operator for time series types.
This operator does not substitute ``or`` (since that is not possible in Python), but can be used as a functional
equivalent for ``or``.
"""
...
[docs]
@operator
def bit_xor(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> DEFAULT[OUT]:
"""
This represents the ``^`` operator for time series types.
"""
...
WiringPort.__xor__ = lambda x, y: bit_xor(x, y)
WiringPort.__rxor__ = lambda x, y: bit_xor(y, x)
[docs]
@operator
def eq_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TS[bool]:
"""
This represents the ``==`` operator for time series types.
"""
...
# This is currently safe to do as the wiring port needs to be immutable, but is never used as a key in a dict or
# compared to another port. But in case we need access to the original store it back on the class.
WiringPort.__orig_eq__ = WiringPort.__eq__
WiringPort.__eq__ = lambda x, y: eq_(x, y)
[docs]
@operator
def ne_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TS[bool]:
"""
This represents the ``!=`` operator for time series types.
"""
...
WiringPort.__ne__ = lambda x, y: ne_(x, y)
[docs]
@operator
def lt_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TS[bool]:
"""
This represents the ``<`` operator for time series types.
"""
...
WiringPort.__lt__ = lambda x, y: lt_(x, y)
[docs]
@operator
def le_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TS[bool]:
"""
This represents the ``<=`` operator for time series types.
"""
...
WiringPort.__le__ = lambda x, y: le_(x, y)
[docs]
@operator
def gt_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TS[bool]:
"""
This represents the ``>`` operator for time series types.
"""
...
WiringPort.__gt__ = lambda x, y: gt_(x, y)
[docs]
@operator
def ge_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TS[bool]:
"""
This represents the ``>=`` operator for time series types.
"""
...
WiringPort.__ge__ = lambda x, y: ge_(x, y)
[docs]
@operator
def neg_(ts: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
"""
This represents the unary ``-`` operator for time series types.
"""
...
WiringPort.__neg__ = lambda x: neg_(x)
[docs]
@operator
def pos_(ts: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
"""
This represents the unary ``+`` operator for time series types.
"""
...
WiringPort.__pos__ = lambda x: pos_(x)
[docs]
@operator
def abs_(ts: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
"""
This represents the ``abs`` operator for time series types.
"""
...
WiringPort.__abs__ = lambda x: abs_(x)
[docs]
@operator
def invert_(ts: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
"""
This represents the unary ``~`` operator for time series types.
"""
...
WiringPort.__invert__ = lambda x: invert_(x)
[docs]
@operator
def contains_(ts: TIME_SERIES_TYPE, item: TS[SCALAR]) -> TS[bool]:
"""
This represents the `in` operator for time series types, however, since ``__contains__`` always returns a bool
value, we can't overload the ``__contains__``, so it is not possible to do ``item in ts``, instead use
``contains_(ts, item)``.
This is logically: ``item in ts``
"""
...
[docs]
@operator
def not_(ts: TIME_SERIES_TYPE) -> TS[bool]:
"""
This represents the unary `not` operator for time series types.
This must be called as ``not_(ts)`` it is not possible to overload the standard ``not`` operator.
"""
...
[docs]
@operator
def getitem_(ts: TIME_SERIES_TYPE, key: TS[SCALAR]) -> TIME_SERIES_TYPE_1:
"""
This represents the ``[]`` operator for time-series types.
Use this as: ``ts[key]``
"""
...
WiringPort.__getitem__ = lambda x, y: getitem_(x, y)
[docs]
@operator
def getattr_(ts: TIME_SERIES_TYPE, attr: str, default_value: SCALAR = None) -> TIME_SERIES_TYPE_1:
"""
This represents the ``.`` operator for time-series types.
Use this as: ``ts.attr`` or more explicitly: ``getattr_(ts, attr)``
"""
...
WiringPort.__getattr__ = lambda x, y: getattr_(x, y)
@operator
def setattr_(ts: OUT, attr: str, value: TIME_SERIES_TYPE_1) -> OUT:
"""
Sets the value on the ``ts`` provided for the ``attr`` to the ``value`` provided.
"""
...
[docs]
@operator
def min_(*ts: TSL[TS[SCALAR], SIZE], default_value: TS[SCALAR] = None, __strict__: bool = True) -> TIME_SERIES_TYPE:
"""
This represents the ``min`` operator for time series types.
* Unary implies the min over the latest TS value for collection types, or running min for non-collection types
In the case of a running sum, a 'reset' signal may be provided, which resets the sum to zero when it ticks
* Binary or multi arg implies item-wise min over all the arguments for collection types,
or the minimum scalar value for scalar types
__strict__ controls whether the operator will tick if any of the arguments are missing
"""
...
[docs]
@operator
def max_(*ts: TSL[TS[SCALAR], SIZE], default_value: TS[SCALAR] = None, __strict__: bool = True) -> TIME_SERIES_TYPE:
"""
The ``max`` operator for time series types.
* Unary implies the max over the latest TS value for collection types, or running max for non-collection types
In the case of a running sum, a 'reset' signal may be provided, which resets the sum to zero when it ticks
* Binary or multi arg implies item-wise max over all the arguments for collection types,
or the maximum scalar value for scalar types
__strict__ controls whether the operator will tick if any of the arguments are missing
"""
...
[docs]
@operator
def sum_(*ts: TSL[TS[SCALAR], SIZE], **kwargs) -> DEFAULT[OUT]:
"""
This represents the ``sum`` operator for time series types, either as a binary or unary operator
* Unary implies the sum over the latest TS value for collection types, or running sum for non-collection types.
In the case of a running sum, a 'reset' signal may be provided, which resets the sum to zero when it ticks
* Binary or multi arg implies item-wise sum over all the arguments for collection types,
or the sum of the scalar value for scalar types
"""
...
[docs]
@operator
def mean(*ts: TSL[TIME_SERIES_TYPE, SIZE]) -> DEFAULT[OUT]:
"""
This represents the ``mean`` operator for time series types
Unary implies the mean over the latest TS value for collection types, or running mean for non-collection types
Binary or multi arg implies item-wise sum over all the arguments for collection types,
or the sum of the scalar value for scalar types
"""
...
[docs]
@operator
def std(*ts: TSL[TIME_SERIES_TYPE, SIZE]) -> DEFAULT[OUT]:
"""
Calculates the standard deviation for time series types
Unary implies the std over the latest TS value for collection types, or running std for non-collection types
Binary or multi arg implies item-wise std over all the arguments for collection types,
or the std of the scalar value for scalar types
"""
...
[docs]
@operator
def var(*ts: TSL[TIME_SERIES_TYPE, SIZE]) -> DEFAULT[OUT]:
"""
Calculates the variance for time series types
Unary implies the var over the latest TS value for collection types, or running var for non-collection types
Binary or multi arg implies item-wise var over all the arguments for collection types,
or the vae of the scalar value for scalar types
"""
...
[docs]
@operator
def zero(tp: Type[TIME_SERIES_TYPE], op: WiringNodeClass) -> TIME_SERIES_TYPE:
"""
This is a helper graph to create a zero time-series (for example, for the reduce function). The zero values are
type and operation dependent, so both are provided. The datatype designers should overload this graph for their
respective data types and return correct zero values for the operation.
"""
...
[docs]
@operator
def len_(ts: TIME_SERIES_TYPE) -> TS[int]:
"""
This represents the `len` operator for time series types.
"""
...
[docs]
@operator
def union(*tsl: TSL[TIME_SERIES_TYPE, SIZE]) -> DEFAULT[OUT]:
"""
Performs a union of the provided time-series values.
Union is { p | p element of tsl[i] for i in range(len(tsl)) }
"""
...
[docs]
@operator
def intersection(*tsl: TSL[TIME_SERIES_TYPE, SIZE]) -> DEFAULT[OUT]:
"""
Performs an intersection of the provided time-series values.
Intersection is { p | p in all tsl[i] for i in range(len(tsl)) }
"""
...
[docs]
@operator
def difference(*tsl: TSL[TIME_SERIES_TYPE, SIZE]) -> DEFAULT[OUT]:
"""
Performs a difference of the provided time-series values.
Difference is { p | p element of lhs and p not element of rhs }
"""
...
[docs]
@operator
def symmetric_difference(*tsl: TSL[TIME_SERIES_TYPE, SIZE]) -> DEFAULT[OUT]:
"""
Performs the symmetric difference of the provided time-series values.
Symmetric difference is { p | p element of union(lhs, rhs), but not element of intersection(lhs, rhs) }
"""
...
[docs]
@operator
def is_empty(ts: TIME_SERIES_TYPE) -> TS[bool]:
"""
Returns True if the value of the time-series is considered empty, False otherwise.
"""
...
[docs]
@operator
def type_(ts: TIME_SERIES_TYPE) -> TS[type]:
"""
Returns the type of the time-series value.
"""
...
@operator
def str_(ts: TIME_SERIES_TYPE) -> TS[str]:
"""
Returns the string representation of the time-series value.
"""
...
# For backwards compatibility. Prefer sum_ to accumulate and mean to average
@graph(deprecated="Prefer sum_")
def accumulate(*ts: TSL[TS[SCALAR], SIZE], default_value: TS[SCALAR] = None) -> DEFAULT[OUT]:
if default_value is not None:
raise NotImplementedError(f"Accumulate is not implemented for {type(default_value)}")
return sum_(*ts)
[docs]
@graph(deprecated="Prefer mean")
def average(*ts: TSL[TIME_SERIES_TYPE, SIZE]) -> DEFAULT[OUT]:
return mean(*ts)
[docs]
class CmpResult(Enum):
LT = -1
EQ = 0
GT = 1
[docs]
@operator
def cmp_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TS[CmpResult]:
"""
Return one of LT, EQ, GT as a comparison result.
This could be more efficient than performing a sequence of operations.
"""
...
[docs]
@operator
def index_of(ts: TIME_SERIES_TYPE_1, item: TIME_SERIES_TYPE_2) -> TS[int]:
"""
Returns the index of a value within the ts provided.
Options include:
TS[tuple[SCALAR, ...]]
returns the index of the first occurrence of the item in the tuple
TSL[TIME_SERIES_TYPE_2, SIZE]
returns the index of the first occurrence of the item in the TSL
"""
...
[docs]
@operator
def ln(ts: TS[float]) -> TS[float]:
"""The natural logarithm of the time-series value"""
...
@operator
def sign(ts: TS[TIME_SERIES_TYPE]) -> TS[TIME_SERIES_TYPE]:
"""The sign of the time-series value"""
...