Decorators

The core HGraph decorators.

operator(fn=None, deprecated: bool | str = False)[source]
Overloads:
  • fn (GRAPH_SIGNATURE) → GRAPH_SIGNATURE

  • deprecated (bool | str) → Callable[[GRAPH_SIGNATURE], GRAPH_SIGNATURE]

Used to define a name and partial signature of a graph operator. A graph operator is a function that operates on more or more time-series values, typically producing a time-series value.

An operator cannot have an implementation and requires an override by a relevant graph node instance.

For example:

@operator
def add_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
    ...

@compute_node(overloads=add_)
def add_ts(lhs: TS[NUMBER], rhs: TS[NUMBER]) -> TS[NUMBER]:
    ...

In this case, we define a generic add_ operator; this is then overloaded to implement the operator over two TS values.

Note that with an operator, the exact signature is not enforced and is provided to better support type hinting, but as with C++ templates, the implementation can overwrite the exact signature as required.

Thus, it is possible to perform the following override:

@compute_node(overloads=add_)
def add_ts_date(lhs: TS[date], rhs: TS[timedelta]) -> TS[date]:
    ...

The overload mechanism attempts to match the provided inputs to the implementation that is the closest fit.

All overloads need to be imported to work; thus when overloading an operator, it is important to make sure the overload would have been imported prior to using. This can be done by making sure the overloads are included in a __init__ (or chain thereof) where importing the top-level package ensures all the children are imported.

Parameters:
  • fn – The function to wrap

  • deprecated (bool | str) – Marks the node as no longer supported and likely to be removed shortly

Returns:

The wrapped function.

graph(fn=None, overloads=None, resolvers=None, requires=None, label=None, deprecated: bool | str = False)[source]
Overloads:
  • fn (GRAPH_SIGNATURE) → GRAPH_SIGNATURE

  • fn (None), overloads (WiringNodeClass’ | GRAPH_SIGNATURE), resolvers (Mapping[‘TypeVar’, Callable]), requires (Callable[[…, …], bool]), label (str | None), deprecated (bool | str) → Callable[[GRAPH_SIGNATURE], GRAPH_SIGNATURE]

The graph decorator represents a function that contains wiring logic. Wiring logic is only evaluated once when the graph is created and is used to indicate which nodes to construct and how to connect the input and outputs of the nodes (or the edges of the graph). It is important to note that the logic of the function does not do any work and merely describes the shape of the runtime graph. A graph can take time-series inputs and can return a time-series value, but this is not a requirement. Typically, the main graph will only take scalar value inputs for configuration (or take no inputs at all).

@graph
def my_graph():
    c = const(1)
    debug_print('c', c)

This is the smallest meaningful graph.

The graph signature, whilst being more flexible takes the same form as for a source, compute or sink node. A common design principle would be to describe behaviour with graph’s initially and then convert to the appropriate nodes once the logic is decomposed sufficiently. It is also possible to re-code a node as a graph, for example:

@compute_node
def a_plus_b_plus_c(a: TS[float], b: TS[float]) -> TS[float]:
    return a.value + b.value + c.value

Can be reworked to be:

@graph
def a_plus_b_plus_c(a: TS[float], b: TS[float]) -> TS[float]:
    return a+b+c

Or vice-versa. The trade-off is, typically, fewer compute nodes can be faster to evaluate, but ``graph``s are far better at re-use of existing components. The preference should always be to use graph logic before constructing node functions.

Returns:

The wrapped function

compute_node(fn=None, /, node_impl=None, active: Sequence[str] | Callable = None, valid: Sequence[str] | Callable = None, all_valid: Sequence[str] | Callable = None, overloads: WiringNodeClass | COMPUTE_NODE_SIGNATURE = None, resolvers: Mapping[Type, Callable] = None, requires: Callable = None, label: str | None = None, deprecated: bool | str = False)[source]
Overloads:
  • fn (COMPUTE_NODE_SIGNATURE) → COMPUTE_NODE_SIGNATURE

  • fn (None), node_impl (Any), active (Sequence[str] | Callable), valid (Sequence[str] | Callable), all_valid (Sequence[str] | Callable), overloads (WiringNodeClass’ | COMPUTE_NODE_SIGNATURE), resolvers (Mapping[‘Type’, Callable]), requires (Callable[[…, …], bool]), label (str | None), deprecated (bool | str) → Callable[[COMPUTE_NODE_SIGNATURE], COMPUTE_NODE_SIGNATURE]

Marks a function as being a compute node in the graph, a compute node accepts one or more time-series inputs and returns a time-series value. The compute node performs work whenever an input (marked as active) is modified.

Example:

@compute_node
def add_ts(lhs: TS[NUMBER], rhs: TS[NUMBER]) -> TS[NUMBER]:
    return lhs.value + rhs.value

In this example, the code accepts two time-series inputs and returns the sum of the two input values.

A compute node can also define some useful parameters to describe the initial state, perform input validations, automatically determine type-signatures, etc.

The active, valid, and all_valid parameters’ control how the compute node being defined will be marked for evaluation.

The active parameter lists the input names that should be marked as active, if no list is provided then all inputs are marked as being active.

For example:

@compute_node(active=('trade_request',))
def accept_trade_request(trade_request: TS[Trade], market_data: TS[float]) -> TS[bool]:
    return trade_request.value.price == market_data.value

In the above example, we only accept trade requests if price on the request is exactly the same as the market_data. In this scenario, we only care to respond to trade requests and not market data. By marking the trade_request as being active, it implies the market data is passive, or in other words, it will not activate the logic when the market data changes. Marking an input passive does not mean the value will be out of date, the value is always up to date; it merely ensures the function is not activated when the input is modified.

‘valid’ works in a similar way to active, in that if it is not set, all inputs are required to be valid before the function will be called. If set, then only the nodes that are listed are included in the guard. When using this the function is required to test if an input is valid when not explicitly listed.

all_valid, when not specified, is defaulted to not requiring the inputs to be all valid. For collection time-series values such as: TSB, TSD, and TSL; if any of the elements are valid, then the collection is marked as valid. However, there are times when you want to ensure all inputs of a collection are valid, in this case mark the inputs in the all_valid clause. This will ensure the function is only evaluated when this state is true.

The overload argument allows the node to be marked as implementing an operator, see help on the operator() decorator for more information.

Resolvers allow the user to provide additional logic to determine a resolution of a TypeVar. The resolver argument is set as a dictionary mapping the type var to be resolved and a function that will be able to resolve the type. For example:

def _resolve_type(mapping: dict[TypeVar, type], scalars: dict[str, Any]) -> type:
    schema = mapping[TS_SCHEMA]  # resolved as it is an input in to the node
    attr = scalars['attr']
    return schema.__meta_data_schema__[attr].value_type.py_type

@compute_node(resolvers={SCALAR: _resolve_type})
def get_attr_tsb(ts: TSB[TS_SCHEMA], attr: str) -> TS[SCALAR]:
    ...

In the above example, the _resolve_type function gets the resolved schema, extracts out the type of the attr and returns the type, which is the resolution of the SCALAR type var.

The requires argument is similar to the resolver argument, but only takes a single function whose responsibility is to determine if the provided inputs meet with the requirements of the node. For example:

def _requires_enum_values(_resolve_type(mapping: dict[TypeVar, type], scalars: dict[str, Any]):
    if scalars['rw_flags'] not in ('r', 'w', 'rw'):
        raise ValueError("rw_flags must be one of 'r', 'w' or 'rw'")

@compute_node(requires=_requires_enum_values)
def some_func(ts: TS[float], rw_flags: str) -> TS[float]:
    ...

In the above example, the requires function ensures the provided input strings match a valid list.

Parameters:
  • fn – The function to wrap

  • node_impl – The node implementation to use (this makes fn a signature only method)

  • active (Union[Sequence[str], Callable]) – Which inputs to mark as being active (by default all are active)

  • valid (Union[Sequence[str], Callable]) – Which inputs to require to be valid (by default all are valid)

  • all_valid (Union[Sequence[str], Callable]) – Which inputs are required to be all_valid (by default none are all_valid)

  • overloads (Union[WiringNodeClass, TypeVar(COMPUTE_NODE_SIGNATURE, bound= Callable)]) – If this node overloads an operator, this is the operator it is designed to overload.

  • resolvers (Mapping[Type, Callable]) – A resolver method to assist with resolving types when they can be inferred but not deduced directly.

  • requires (Callable) – Callable which accepts the mapping and scalars as parameters and validates the inputs meet with the requirements defined in the function.

  • deprecated (bool | str) – Marks the node as no longer supported and likely to be removed shortly

sink_node(fn=None, /, node_impl=None, active=None, valid=None, all_valid=None, overloads=None, resolvers=None, requires=None, label=None, deprecated: bool | str = False)[source]
Overloads:
  • fn (SINK_NODE_SIGNATURE) → SINK_NODE_SIGNATURE

  • fn (None), node_impl (Any), active (Sequence[str] | Callable), valid (Sequence[str] | Callable), all_valid (Sequence[str] | Callable), overloads (WiringNodeClass’ | SINK_NODE_SIGNATURE), resolvers (Mapping[‘Type’, Callable]), requires (Callable[[…, …], bool]), label (str | None), deprecated (bool | str) → Callable[[SINK_NODE_SIGNATURE], SINK_NODE_SIGNATURE]

A sink node is a node in the graph that accepts one or more time-series inputs and produces no output. These nodes are leaf nodes of the graph and generally the only nodes in the graph that we expect to have side effects. Examples of sink nodes include: writing to the output stream, network, database, etc.

@sink_node
def print_(format_str: str, value: TS[SCALAR]):
    print(format_str.format(value.value))

The remaining arguments are the same as described in the compute_node() decorator.

Parameters:
  • fn – The function to wrap

  • node_impl – The node implementation to use (this makes fn a signature only method)

  • active – Which inputs to mark as being active (by default all are active)

  • valid – Which inputs to require to be valid (by default all are valid)

  • all_valid – Which inputs are required to be all_valid (by default none are all_valid)

  • overloads – If this node overloads an operator, this is the operator it is designed to overload.

  • resolvers – A resolver method to assist with resolving types when they can be inferred but not deduced directly.

  • requires – Callable which accepts the mapping and scalars as parameters and validates the inputs meet with the requirements defined in the function.

  • deprecated (bool | str) – Marks the node as no longer supported and likely to be removed shortly

generator(fn=None, overloads=None, resolvers=None, requires=None, label=None, deprecated: bool | str = False)[source]
Overloads:
  • fn (SOURCE_NODE_SIGNATURE) → SOURCE_NODE_SIGNATURE

  • fn (None), overloads (WiringNodeClass’ | GRAPH_SIGNATURE), resolvers (Mapping[‘TypeVar’, Callable]), requires (Callable[[…, …], bool]), label (str | None), deprecated (bool | str) → Callable[[SOURCE_NODE_SIGNATURE], SOURCE_NODE_SIGNATURE]

Creates a pull source node that supports generating a sequence of ticks that will be fed into the graph. The generator wraps a function that is implemented as a python generator which returns a tuple of time (or timedelta) and value.

For example:

@generator
def signal() -> TS[bool]:
    while True:
        yield (timedelta(milliseconds=1), True)

This will cause an infinite sequence of ticks (with value of True) that will tick one a millisecond.

The generator will fetch the first tick during the start life-cycle of the node. If no tick is returned, the generator WILL do NOTHING.

push_queue(tp: type[TIME_SERIES_TYPE], overloads: WiringNodeClass | SOURCE_NODE_SIGNATURE = None, resolvers: Mapping[TypeVar, Callable] = None, requires: Callable[[..., ...], bool] = None, label: str | None = None, deprecated: bool | str = False)[source]

Creates a push source node that supports injecting values into the graph asynchronously. The function wrapped by this decorator will be called as a start lifecycle method. The function must take as its first parameter the sender callable. It is possible to take additional scalar values that will be provided as kwargs.

For example,

@push_queue(TS[bool])
def my_message_sender(sender: Callable[[SCALAR], None]):
    ...
const_fn(fn=None, overloads=None, resolvers=None, requires=None, label=None, deprecated: bool | str = False)[source]
Overloads:
  • fn (SOURCE_NODE_SIGNATURE) → SOURCE_NODE_SIGNATURE

  • fn (None), overloads (WiringNodeClass’ | GRAPH_SIGNATURE), resolvers (Mapping[‘TypeVar’, Callable]), requires (Callable[[…, …], bool]), label (str | None), deprecated (bool | str) → Callable[[SOURCE_NODE_SIGNATURE], SOURCE_NODE_SIGNATURE]

Wraps a constant function. A constant function is one that accepts non-time-series inputs and produces a constant value. The function returns a value that matches the scalar representation of the time-series output. The side effect of using this decorator is that the value can be called in the context of a graph or as a normal function. This can still benefit from operator resolution as well.

For example:

@const_fn
def my_const(a: int, b: int) -> TS[int]:
    return a + b

@graph
def my_graph():
    a = my_const(1, 2) # Used inside the graph wiring.
    if a.value == 3:
        debug_print('a', a)
    else:
        raise RuntimeError("Bad things happening")

print(f"1+2={my_const(1, 2).value}")  # Used outside the graph wiring
reference_service(fn=None, resolvers=None)[source]
Overloads:
  • fn (SERVICE_DEFINITION) → SERVICE_DEFINITION

  • resolvers (Mapping[‘TypeVar’, Callable]) → Callable[[SERVICE_DEFINITION], SERVICE_DEFINITION]

A reference service is a service that only produces a value that does not vary by request. The pattern for a reference services is the same as a source node.

for example:

@reference_service
def my_reference_service(path: str | None) -> OUT_TIME_SERIES:
    ...

if path is not provided or defined in the configuration, it is assumed there will only be one bound instance and that bound instance will be to the path ‘ref_svc://<module>.<svc_name>’ for example: ‘ref_svc://a.b.c.my_reference_service’

The implementation needs to be registered by the outer wiring node, if not registered, it will look for a remote instance of the service to bind to.

subscription_service(fn=None, resolvers=None)[source]
Overloads:
  • fn (SERVICE_DEFINITION) → SERVICE_DEFINITION

  • resolvers (Mapping[‘TypeVar’, Callable]) → Callable[[SERVICE_DEFINITION], SERVICE_DEFINITION]

A subscription service is a service where the input receives a subscription key and then streams back results. This looks like:

default=None

@subscription_service
def my_subscription_svc(path: str | None, ts1: TS[str]) -> TS[float]:
    ...

@service_impl(interface=my_subscription_svc)
def my_subscription_svc_impl(ts1: TSD[RequesterId, TS[str]]) -> TSD[RequesterId, TS[float]]:
    ...

@graph
def my_code():
    register_service(default, my_subscription_svc, my_subscription_svc_impl)
    ...
    out = my_subscription_svc(default, ts1="mkt_data.mcu_3m")
request_reply_service(fn=None, resolvers=None)[source]
Overloads:
  • fn (SERVICE_DEFINITION) → SERVICE_DEFINITION

  • resolvers (Mapping[‘TypeVar’, Callable]) → Callable[[SERVICE_DEFINITION], SERVICE_DEFINITION]

A request-reply service takes a request and returns a response, error or time-out. for example:

class RequestReplyService(Generic[TIME_SERIES_TYPE_1]):
    result: TIME_SERIES_TYPE_1
    time_out: TS[bool]
    error: TS[str]

@request_reply_service
def my_request_reply(path: str | None, request: TIME_SERIES_TYPE) -> TSB[ReqRepResponse[TIME_SERIES_TYPE_1]]:
    ...
service_impl(fn: GRAPH_SIGNATURE = None, *, interfaces: Sequence[SERVICE_DEFINITION] | SERVICE_DEFINITION = None, resolvers: Mapping[TypeVar, Callable] = None, deprecated: bool | str = False)[source]
Overloads:
  • fn (GRAPH_SIGNATURE) → GRAPH_SIGNATURE

  • interfaces (Sequence[SERVICE_DEFINITION] | SERVICE_DEFINITION), resolvers (Mapping[‘TypeVar’, Callable]), deprecated (bool | str) → Callable[[GRAPH_SIGNATURE], GRAPH_SIGNATURE]

Wrap a service implementation.

Bare @service_impl defines the reusable inner graph. @service_impl(interfaces=...) binds that graph to one or more service interfaces.

adaptor(fn=None, resolvers=None)[source]
Overloads:
  • fn (SERVICE_DEFINITION) → SERVICE_DEFINITION

  • resolvers (Mapping[‘TypeVar’, Callable]) → Callable[[SERVICE_DEFINITION], SERVICE_DEFINITION]

@adaptor
def my_interface(ts1: TIME_SERIES, ...) -> OUT_TIME_SERIES:
    pass

@adaptor_impl(my_interface)
def my_adaptor(ts1: TIME_SERIES, ...) -> OUT_TIME_SERIES:
    pass

This is a client interface for a single client adaptor. An adaptor is a graph pattern primarily used to define connectivity from graph code to the outside world.

Usage of the adaptor is typically done using the new accessors:

  • from_graph(path=…, …)

  • to_graph(path=…, …)

This allows us to use the service in different orders to the defined order.

The path must be unique per usage of the adaptor. (There is only one instance associated to the path).

adaptor_impl(*, interfaces: Sequence[SERVICE_DEFINITION] | SERVICE_DEFINITION = None, resolvers: Mapping[TypeVar, Callable] = None, deprecated: bool | str = False)[source]

Wraps an adaptor implementation.

service_adaptor(fn=None, resolvers=None)[source]
Overloads:
  • fn (SERVICE_DEFINITION) → SERVICE_DEFINITION

  • resolvers (Mapping[‘TypeVar’, Callable]) → Callable[[SERVICE_DEFINITION], SERVICE_DEFINITION]

@service_adaptor
def my_interface(ts1: TIME_SERIES, ...) -> OUT_TIME_SERIES:
    pass

@service_adaptor_impl(my_interface)
def my_adaptor(ts1: TIME_SERIES, ...) -> OUT_TIME_SERIES:
    pass

Service adaptor is a mutli-client version of adaptor. It works in a similar way to the request reply service in the way that every client on the graph gets an integer id and all client requests are combined into a TSD keyed by those ids. Replies from the adaptor are expected to be also keyed by the same ids so that they can be delivered to the correct client

Note

this decorator is temporary, the plan is to make a common service interface decorator that will work for both request-reply service and mutli-client adaptors and implementations will be compatible so that even the same service with different paths can be implemented as a service or adaptor by implementor’s choice

When using to_graph / from_graph, __request_id__ needs to be used to distinguish different usages.

calling from_graph first allows the usage of a system generated __request_id__, using the pattern:

id = my_interface.from_graph(path='...')
...
my_interface.to_graph(..., __request_id__=id, ...)

This will allow for unique association to the adaptor “instance”.

service_adaptor_impl(*, interfaces: Sequence[SERVICE_DEFINITION] | SERVICE_DEFINITION = None, resolvers: Mapping[TypeVar, Callable] = None, label: str | None = None, deprecated: bool | str = False)[source]

Wraps an adaptor implementation.

component(fn=None, *, recordable_id=None, resolvers=None, label=None, deprecated: bool | str = False)[source]
Overloads:
  • fn (GRAPH_SIGNATURE) → GRAPH_SIGNATURE

  • fn (None), recordable_id (str | None), resolvers (Mapping[‘TypeVar’, Callable]), label (str | None), deprecated (bool | str) → Callable[[GRAPH_SIGNATURE], GRAPH_SIGNATURE]

Decorator to indicate the function being wrapped is a graph that is recordable. A component is a graph, with the following constraints:

  • The component should not access any services unless the service is already able to support replay mode.

  • The component MAY NOT use ANY push source nodes.

  • The component should not use any pull source nodes that are not replay compliant.

  • The component should not use any sink nodes that will have side effects other than, for example, printing or logging.

The component is expected to be idempotent (i.e., given the same inputs, the graph should produce the same results) and have no side effects.

When using the component in save/restore mode, the component should be able to produce a correct result if all the inputs were re-ticked into the graph and re-computed, in other words, the component should not depend on the order of tick arrival to produce the same result. <This constraint will be removed as soon as possible, but there are a number of complexities to fully manage the saving and restoration of graph state correctly>

The key idea behind a component is that it represents a meaningful amount of work that will benefit from regression testing in the form of replaying the inputs and comparing the results produced for correctness. This will also support recording inputs and results for logging and diagnostic purposes. Finally, it is possible to use components to support restorable computations and to allow for skipping re-computation of already computed results. This allows for re-running graphs when one or more components have failed without having to re-compute each node.

The recordable_id represents the unique id of the component within the context of the master graph. If no id is provided, then the name of the component is used as the id. If it is possible to have more than one instance of the component is supported in the master graph, then the component needs to be able to create a unique identy, the recordable_id support using a Format string with the ability to create an id using the scalar properties of the component or any const inputs provided (for example, a key in a bundle). Here is an example:

@component(recordable_id='my_component.{key}')
def my_component(key: TS[str], ...) -> TIME_SERIES_VALUE:
    ...

Then if this was used in a map_ the key will be available at start and will create an instance key.

pull_source_node(fn=None, /, node_impl=None, resolvers=None, requires=None, label=None, deprecated: bool | str = False)[source]
Overloads:
  • fn (SOURCE_NODE_SIGNATURE) → SOURCE_NODE_SIGNATURE

  • fn (None), node_impl (Any), resolvers (Mapping[‘TypeVar’, Callable]), requires (Callable[[…, …], bool]), label (str | None), deprecated (bool | str) → Callable[[SOURCE_NODE_SIGNATURE], SOURCE_NODE_SIGNATURE]

Used to indicate the signature for a source node. For Python source nodes use either the generator or source_adapter annotations.

push_source_node(fn=None, /, node_impl=None, resolvers=None, requires=None, label=None, deprecated: bool | str = False)[source]
Overloads:
  • fn (SOURCE_NODE_SIGNATURE) → SOURCE_NODE_SIGNATURE

  • fn (None), node_impl (Any), resolvers (Mapping[‘TypeVar’, Callable]), requires (Callable[[…, …], bool]), label (str | None), deprecated (bool | str) → Callable[[SOURCE_NODE_SIGNATURE], SOURCE_NODE_SIGNATURE]

Used to indicate the signature for a push source node.

Supporting functions

In order to support the service and adaptor decorators, the following functions are defined:

register_service(path: str, implementation, resolution_dict=None, **kwargs)[source]

Binds the implementation of the interface to the path provided. The additional kwargs are passed to the implementation. These should be defined on the implementation and are independent of the attributes defined in the service. :type path: str :param path: :type implementation: :param implementation: :type resolution_dict: :param resolution_dict: :type kwargs: :param kwargs: :return:

register_adaptor(path: str, implementation, resolution_dict=None, **kwargs)[source]

Binds the implementation of the adaptor to the path provided. The additional kwargs are passed to the implementation. These should be defined on the implementation and are independent of the attributes defined in the adaptor.