Part VIII: Advanced Concepts
Version: 1.0 Draft Last Updated: 2025-12-20
1. Introduction
This document covers advanced HGraph concepts including:
Operator overloading and resolution
Custom type resolvers
Services (subscription, reference, request-reply)
Adaptors (single-client and multi-client)
Components
Error handling
graph TD
ADV[Advanced Concepts]
ADV --> OVL[Operator Overloading]
ADV --> RES[Custom Resolvers]
ADV --> SVC[Services]
ADV --> ADP[Adaptors]
ADV --> CMP[Components]
ADV --> ERR[Error Handling]
SVC --> SUB[Subscription]
SVC --> REF[Reference]
SVC --> RR[Request-Reply]
ADP --> SA[Single-Client]
ADP --> MA[Multi-Client]
2. Operator Overloading
2.1 The @operator Decorator
The @operator decorator defines a polymorphic operation signature without implementation:
@operator
def add_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE_1) -> DEFAULT[OUT]:
"""This represents the ``+`` operator for time series types."""
Key Characteristics:
Creates an
OperatorWiringNodeClassinstanceFunction body is empty (
...orpass)Defines generic type parameters that implementations can specialize
Does NOT enforce exact signature matching (template-like behavior)
2.2 Registering Overloads
Implementations register via the overloads parameter:
@compute_node(overloads=add_)
def add_ints(lhs: TS[int], rhs: TS[int]) -> TS[int]:
return lhs.value + rhs.value
@compute_node(overloads=add_)
def add_floats(lhs: TS[float], rhs: TS[float]) -> TS[float]:
return lhs.value + rhs.value
@compute_node(overloads=add_)
def add_generic(lhs: TS[SCALAR], rhs: TS[SCALAR]) -> TS[SCALAR]:
return lhs.value + rhs.value # Fallback for any scalar
2.3 Complete Working Example
from hgraph import operator, compute_node, graph, TS, TIME_SERIES_TYPE, SIZE, TSL
# Step 1: Define the operator signature
@operator
def add(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
"""Generic addition operator."""
...
# Step 2: Register type-specific overloads
@compute_node(overloads=add)
def add_default(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
"""Fallback implementation for any type."""
return lhs.value + rhs.value
@compute_node(overloads=add)
def add_ints(lhs: TS[int], rhs: TS[int]) -> TS[int]:
"""Specialized for integers - adds extra 1."""
return lhs.value + rhs.value + 1
@compute_node(overloads=add)
def add_strs(lhs: TS[str], rhs: TS[str]) -> TS[str]:
"""Specialized for strings - adds suffix."""
return lhs.value + rhs.value + "~"
@graph(overloads=add)
def add_tsls(lhs: TSL[TIME_SERIES_TYPE, SIZE], rhs: TSL[TIME_SERIES_TYPE, SIZE]) -> TSL[TIME_SERIES_TYPE, SIZE]:
"""Specialized for time-series lists."""
return TSL.from_ts(*[a + b for a, b in zip(lhs, rhs)])
# Step 3: Use the operator - correct overload selected automatically
@graph
def test_add(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE:
return add(lhs, rhs)
# Results:
# add(TS[int](1), TS[int](2)) -> 4 (uses add_ints: 1+2+1)
# add(TS[float](1.0), TS[float](2.0)) -> 3.0 (uses add_default)
# add(TS[str]("a"), TS[str]("b")) -> "ab~" (uses add_strs)
2.4 Overload Resolution
sequenceDiagram
participant C as Caller
participant O as Operator
participant H as OverloadHelper
participant I as Implementations
C->>O: add_(TS[int], TS[int])
O->>H: _check_overloads()
H->>I: Try resolve each overload
I-->>H: Candidates with ranks
H->>H: Sort by rank (ascending)
H->>H: Select lowest rank
H-->>O: Best match: add_ints
O-->>C: WiringPort
Resolution Algorithm:
Collect Candidates: Try to resolve each overload’s signature against provided arguments
Calculate Ranks: Each candidate receives a rank based on type specificity
Select Best: Choose the candidate with the lowest rank (most specific)
Outcome |
Action |
|---|---|
One clear winner |
Use that overload |
Multiple same rank |
Raise ambiguity error |
No candidates |
Raise “no overload found” error |
2.5 Generic Rank System
The rank quantifies type specificity. Lower rank = more specific = preferred.
graph LR
subgraph "Rank Examples"
R0["TS[int] → rank ≈ 1e-10<br/>(most specific)"]
R1["TS[SCALAR] → rank ≈ 1.0"]
R2["TIME_SERIES_TYPE → rank ≈ 1.0+<br/>(least specific)"]
end
R0 --> |"preferred over"| R1
R1 --> |"preferred over"| R2
Rank Calculation:
def _calc_rank(signature: WiringNodeSignature) -> float:
if signature.node_type == WiringNodeType.OPERATOR:
return 1e6 # Operators themselves have lowest priority
ranks = []
for k, t in signature.input_types.items():
if signature.defaults.get(k) != AUTO_RESOLVE:
if t.is_scalar:
rank = scale_rank(t.generic_rank, 0.001) # Scalars scaled down
elif k in (signature.var_arg, signature.var_kwarg):
rank = scale_rank(t.generic_rank, 100.0) # Var args scaled up
else:
rank = t.generic_rank # Time-series get full rank
ranks.append(rank)
return sum(combine_ranks(ranks).values())
Type Rank Values:
Type |
Rank |
Notes |
|---|---|---|
Concrete ( |
~1e-10 |
Very specific |
TypeVar ( |
~1.0 |
Generic |
|
~1.0 |
Generic |
Nested generics |
Scaled |
Combines inner ranks |
2.6 Real-World Example: Number Operators
From hgraph/_impl/_operators/_number_operators.py:
from hgraph import add_, sub_, mul_, div_, compute_node, TS, NUMBER, NUMBER_2
from hgraph._operators._operators import DivideByZero
@compute_node(overloads=add_)
def add_float_to_int(lhs: TS[int], rhs: TS[float]) -> TS[float]:
"""Adds a timeseries of float to a timeseries of int."""
return lhs.value + rhs.value
@compute_node(overloads=add_)
def add_int_to_float(lhs: TS[float], rhs: TS[int]) -> TS[float]:
"""Adds a timeseries of int to a timeseries of float."""
return lhs.value + rhs.value
@compute_node(overloads=div_)
def div_numbers(
lhs: TS[NUMBER],
rhs: TS[NUMBER_2],
divide_by_zero: DivideByZero = DivideByZero.ERROR
) -> TS[float]:
"""Divides numeric timeseries with configurable zero handling."""
try:
return lhs.value / rhs.value
except ZeroDivisionError:
match divide_by_zero:
case DivideByZero.NAN:
return float("NaN")
case DivideByZero.INF:
return float("inf")
case DivideByZero.NONE:
return None # No output
case DivideByZero.ZERO:
return 0.0
case _:
raise
2.7 Python Operator Mapping
Operators are mapped to Python special methods:
WiringPort.__add__ = lambda x, y: add_(x, y)
WiringPort.__sub__ = lambda x, y: sub_(x, y)
WiringPort.__mul__ = lambda x, y: mul_(x, y)
# ... etc
2.8 Import Requirements
Overloads must be imported before use. The typical pattern:
# In package __init__.py
from my_package._impl._operators import (
add_ints,
add_floats,
add_strings,
# ... all overloads
)
3. Custom Resolvers
3.1 AUTO_RESOLVE Sentinel
AUTO_RESOLVE marks parameters that should be automatically resolved from input types:
AUTO_RESOLVE = object() # Sentinel value
@compute_node
def my_node(ts: TS[SCALAR], _tp: type[SCALAR] = AUTO_RESOLVE) -> TS[SCALAR]:
# _tp will be automatically resolved to the actual SCALAR type
return ts.value
Usage Pattern:
Used as default value for type parameters
Wiring system resolves actual type from TypeVar resolution
Common for extracting types from resolved inputs
3.2 Custom Resolver Functions
The resolvers parameter provides custom type resolution logic:
@compute_node(
resolvers={SCALAR: lambda mapping, attr: mapping[COMPOUND_SCALAR].meta_data_schema[attr].py_type}
)
def getattr_cs(ts: TS[COMPOUND_SCALAR], attr: str, default: TS[SCALAR] = None) -> TS[SCALAR]:
return getattr(ts.value, attr, default.value)
3.3 Resolver Function Signatures
Style 1: Single Parameter (Mapping Only)
lambda mapping: resolved_type
Resolver receives only the resolution mapping dictionary.
Style 2: Multiple Parameters (Mapping + Scalars)
lambda mapping, param1, param2: resolved_type
Resolver receives mapping plus named scalar parameters from the function.
3.4 Resolution Process
sequenceDiagram
participant W as Wiring
participant S as Signature
participant R as Resolvers
W->>S: build_resolution_dict()
S->>S: Resolve input types
S->>S: Build resolution mapping
S->>R: Call custom resolvers
Note over R: Resolver receives:<br/>- mapping dict<br/>- scalar params
R-->>S: Resolved type
S->>S: Validate resolved type
S-->>W: Complete resolution
3.5 Resolver Examples
Example 1: Attribute Type Resolution
From hgraph/_impl/_operators/_getattr.py:
@compute_node(
overloads=getattr_,
resolvers={SCALAR: lambda mapping, attr: mapping[COMPOUND_SCALAR].meta_data_schema[attr].py_type},
)
def getattr_cs(ts: TS[COMPOUND_SCALAR], attr: str, default_value: TS[SCALAR] = None) -> TS[SCALAR]:
"""Get attribute from compound scalar, resolving output type from schema."""
attr_value = getattr(ts.value, attr, default_value.value)
return default_value.value if attr_value is None else attr_value
@compute_node(
overloads=setattr_,
resolvers={SCALAR: lambda mapping, attr: mapping[COMPOUND_SCALAR].meta_data_schema[attr].py_type},
)
def setattr_cs(ts: TS[COMPOUND_SCALAR], attr: str, value: TS[SCALAR]) -> TS[COMPOUND_SCALAR]:
"""Set attribute on compound scalar."""
v = copy(ts.value)
setattr(v, attr, value.value)
return v
Example 2: Tuple Element Type Resolution
From hgraph/_impl/_operators/_tuple_operators.py:
def _item_type(tuple_tp: Type[TUPLE], index: int) -> Type:
"""Resolve the type of a tuple element at given index."""
if isinstance(tuple_tp, HgTupleFixedScalarType):
return tuple_tp.element_types[index]
elif isinstance(tuple_tp, HgTupleCollectionScalarType):
return tuple_tp.element_type
raise IncorrectTypeBinding(TUPLE, tuple_tp)
@compute_node(
overloads=getitem_,
resolvers={SCALAR: lambda mapping, key: _item_type(mapping[TUPLE], key)}
)
def getitem_tuple_fixed(ts: TS[TUPLE], key: int) -> TS[SCALAR]:
"""Get element from tuple at index, with type resolution."""
return ts.value[key]
Example 3: DataFrame Schema Resolution
From hgraph/_impl/_operators/_data_frame_operators.py:
def _cs_from_frame(mapping, df: pl.DataFrame) -> COMPOUND_SCALAR:
"""Create compound scalar type from DataFrame schema."""
schema = df.schema
return compound_scalar(**{k: _convert_type(v) for k, v in schema.items()})
def _extract_scalar(mapping, df, value_col) -> SCALAR:
"""Extract scalar type from DataFrame column."""
schema = df.schema
return _convert_type(schema[value_col])
@generator(
overloads=from_data_frame,
resolvers={SCALAR: _extract_scalar, COMPOUND_SCALAR: _cs_from_frame},
)
def from_data_frame_ts(
df: Frame[COMPOUND_SCALAR],
dt_col: str = "date",
value_col: str = "value",
_df_tp: type[COMPOUND_SCALAR] = AUTO_RESOLVE,
) -> TS[SCALAR]:
"""Generate time-series from DataFrame column."""
# ... implementation
Example 4: Function Return Type Resolution
@compute_node(
valid=("fn",),
resolvers={TIME_SERIES_TYPE: lambda m, fn: TS[fn.__annotations__["return"]]}
)
def apply(fn: TS[Callable], *args: TSB[TS_SCHEMA]) -> DEFAULT[TIME_SERIES_TYPE]:
"""Apply callable, resolving output type from function annotation."""
fn_ = fn.value
return fn_(*args)
3.6 Unit Test Example
def test_func_resolve():
def x(x) -> str:
return str(x)
@compute_node(
resolvers={SCALAR_1: lambda mapping, f: f.__annotations__["return"]}
)
def call(ts: TS[SCALAR], f: type(x)) -> TS[SCALAR_1]:
return f(ts.value)
assert eval_node(call[SCALAR:int], [1, 2], f=x) == ["1", "2"]
4. Services
Services enable request-reply and publish-subscribe patterns within HGraph.
4.1 Service Types Overview
graph TD
SVC[Service Types]
SVC --> SUB[Subscription Service]
SVC --> REF[Reference Service]
SVC --> RR[Request-Reply Service]
SUB --> |"1:N streaming"| SUB_DESC[Client subscribes,<br/>receives stream]
REF --> |"Shared state"| REF_DESC[Single value,<br/>all clients share]
RR --> |"1:1 response"| RR_DESC[Request in,<br/>response out]
4.2 Subscription Service
A streaming service where clients subscribe with keys and receive ongoing updates.
Definition:
@subscription_service
def my_subs_service(path: str, subscription: TS[str]) -> TS[str]:
"""The service description."""
Implementation:
@service_impl(interfaces=my_subs_service)
def my_subs_service_impl(subscription: TSS[str]) -> TSD[str, TS[str]]:
"""Implementation receives set of subscriptions, returns dict of results."""
return map_(pass_through_node, __keys__=subscription, __key_arg__="ts")
Complete Working Example:
from hgraph import (
subscription_service, service_impl, graph, register_service,
default_path, TS, TSS, TSD, TSL, SIZE, map_, pass_through
)
@subscription_service
def my_subs_service(path: str, subscription: TS[str]) -> TS[str]:
"""Subscribe to receive string updates."""
@graph
def subscription_instance(key: TS[str]) -> TS[str]:
return key
@service_impl(interfaces=my_subs_service)
def my_subs_service_impl(subscription: TSS[str]) -> TSD[str, TS[str]]:
return map_(pass_through_node, __keys__=subscription, __key_arg__="ts")
@graph
def main(sub1: TS[str], sub2: TS[str], sub3: TS[str]) -> TSL[TS[str], SIZE]:
register_service(default_path, my_subs_service_impl)
return TSL.from_ts(
pass_through_node(my_subs_service(default_path, sub1)),
pass_through_node(my_subs_service(default_path, sub2)),
pass_through_node(my_subs_service(default_path, sub3)),
)
# Usage:
# eval_node(main, ["topic1", None, None], ["topic2", None, None], [None, None, "topic1"])
# Result: [None, {0: "topic1", 1: "topic2"}, {2: "topic1"}]
Characteristics:
Input: Individual
TS[T]per clientImplementation receives:
TSS[T](set of subscriptions)Implementation returns:
TSD[T, TS[Result]](keyed results)
4.3 Reference Service
A service producing a single shared value independent of requesters.
Definition:
@reference_service
def my_service(path: str = None) -> TSD[str, TS[str]]:
"""The service description."""
Implementation:
@service_impl(interfaces=my_service)
def my_service_impl() -> TSD[str, TS[str]]:
return const(frozendict({"test": "a value"}), TSD[str, TS[str]])
Complete Working Example:
from hgraph import (
reference_service, service_impl, graph, register_service,
default_path, TS, TSD, const, frozendict
)
@reference_service
def config_service(path: str = None) -> TSD[str, TS[str]]:
"""Get shared configuration."""
@service_impl(interfaces=config_service)
def config_service_impl() -> TSD[str, TS[str]]:
return const(frozendict({"api_key": "secret", "endpoint": "https://api.example.com"}))
@graph
def main() -> TS[str]:
register_service(default_path, config_service_impl)
config = config_service()
return config["api_key"]
# Usage: eval_node(main) == ["secret"]
Characteristics:
Single output shared by all clients
Output type automatically wrapped as
REF[TIME_SERIES_TYPE]Auto-binds to
ref_svc://<module>.<name>if no path provided
4.4 Request-Reply Service
A synchronous request-response pattern with timeout handling.
Definition:
@request_reply_service
def add_one_service(path: str, ts: TS[int]) -> TS[int]:
"""The service description."""
Implementation:
@service_impl(interfaces=add_one_service)
def add_one_service_impl(ts: TSD[int, TS[int]]) -> TSD[int, TS[int]]:
return map_(lambda x: x + 1, ts)
Complete Working Example:
from hgraph import (
request_reply_service, service_impl, graph, register_service,
default_path, TS, TSD, map_
)
@request_reply_service
def add_service(path: str, ts: TS[int], ts1: TS[int]) -> TS[int]:
"""Add two numbers via service."""
@service_impl(interfaces=add_service)
def add_service_impl(ts: TSD[int, TS[int]], ts1: TSD[int, TS[int]]) -> TSD[int, TS[int]]:
return map_(lambda x, y: x + y, ts, ts1)
@graph
def main(x: TS[int], y: TS[int]) -> TS[int]:
register_service(default_path, add_service_impl)
return add_service(default_path, x, y)
# Usage: eval_node(main, [1], [2]) == [None, None, 3]
# Note: Two None ticks for service setup latency
Response Type:
TSB[ReqRepResponse[TIME_SERIES_TYPE_1]]
# Contains:
# result: TIME_SERIES_TYPE_1
# time_out: TS[bool]
# error: TS[str]
4.5 Multi-Service Implementation
A single implementation can serve multiple service interfaces:
from hgraph import (
request_reply_service, reference_service, subscription_service,
service_impl, get_service_inputs, set_service_output
)
@request_reply_service
def submit(path: str, ts: TS[int]): ...
@reference_service
def receive(path: str) -> TSS[int]: ...
@subscription_service
def subscribe(path: str, ts: TS[int]) -> TS[bool]: ...
@service_impl(interfaces=(submit, receive, subscribe))
def impl(path: str):
"""Single implementation handles all three service interfaces."""
submissions: TSD[int, TS[int]] = get_service_inputs(path, submit).ts
items = flip(submissions).key_set
set_service_output(path, receive, items)
set_service_output(
path,
subscribe,
map_(
lambda key, i: contains_(i, key),
__keys__=subscribe.wire_impl_inputs_stub(path).ts,
i=pass_through_node(items),
),
)
Helper Functions:
Function |
Purpose |
|---|---|
|
Retrieve the combined inputs for a service interface (returns struct with input fields) |
|
Set the output for a service interface to broadcast to clients |
4.6 Service Registration
from hgraph import register_service, default_path
# Register implementation for a path
register_service("my_path", my_impl, resolution_dict={...})
# Use default path
register_service(default_path, my_impl)
Path Format: <service_type>://<path>/<function_name>
5. Adaptors
Adaptors provide bidirectional connectivity between HGraph and external systems.
5.1 Single-Client Adaptor
For point-to-point external communication.
Interface Definition:
@adaptor
def my_adaptor(path: str, ts: TS[int]) -> TS[int]:
"""Send data out, receive data in."""
Implementation:
@adaptor_impl(interfaces=my_adaptor)
def my_adaptor_impl(path: str, ts: TS[int]) -> TS[int]:
queue_sink(path, ts) # Send to external
return queue_source(path) # Receive from external
Complete Working Example:
from hgraph import (
adaptor, adaptor_impl, graph, register_adaptor,
TS, queue_sink, queue_source, count, schedule,
stop_on_value, evaluate_graph, GraphConfiguration, EvaluationMode
)
from datetime import timedelta
@adaptor
def my_adaptor(path: str, ts: TS[int]) -> TS[int]:
"""Echo adaptor - sends and receives integers."""
@adaptor_impl(interfaces=my_adaptor)
def my_adaptor_impl(path: str, ts: TS[int]) -> TS[int]:
queue_sink(path, ts)
return queue_source(path)
@graph
def g() -> TS[int]:
register_adaptor("test_adaptor", my_adaptor_impl)
result = my_adaptor(
"test_adaptor",
count(schedule(timedelta(milliseconds=10), max_ticks=10))
)
stop_on_value(result, 10)
return result
# Execute in real-time mode
result = evaluate_graph(
g,
GraphConfiguration(
run_mode=EvaluationMode.REAL_TIME,
end_time=timedelta(milliseconds=1000)
)
)
# result: [(t1, 1), (t2, 2), ..., (t10, 10)]
With Parameters:
@adaptor
def my_adaptor(path: str, b: bool, ts: TS[int]) -> TS[int]: ...
@adaptor_impl(interfaces=my_adaptor)
def my_adaptor_impl(path: str, b: bool, ts: TS[int]) -> TS[int]:
path = f"{path}_{b}" # Use parameter to customize path
queue_sink(path, ts if b else ts + 1)
return queue_source(path)
5.2 Multi-Client Service Adaptor
For multiple concurrent client connections.
Interface Definition:
@service_adaptor
def my_adaptor(path: str, ts: TS[int]) -> TS[int]:
"""Handle multiple client requests."""
Implementation:
@service_adaptor_impl(interfaces=my_adaptor)
def my_adaptor_impl(path: str, ts: TSD[int, TS[int]]) -> TSD[int, TS[int]]:
"""Implementation receives multiplexed by ClientId."""
tsd_queue_sink(path, ts)
return tsd_queue_source(path)
Complete Working Example:
from hgraph import (
service_adaptor, service_adaptor_impl, graph, register_adaptor,
TS, TSD, TSL, Size, tsd_queue_sink, tsd_queue_source, map_,
count, schedule, combine, stop_on_tsl_values, evaluate_graph
)
from datetime import timedelta
@service_adaptor
def my_adaptor(path: str, ts: TS[int]) -> TS[int]: ...
@service_adaptor_impl(interfaces=my_adaptor)
def my_adaptor_impl(path: str, b: bool, ts: TSD[int, TS[int]]) -> TSD[int, TS[int]]:
path = f"{path}_{b}"
tsd_queue_sink(path, ts if b else map_(lambda x: x + 1, ts))
return tsd_queue_source(path)
@graph
def g() -> TSL[TS[int], Size[2]]:
register_adaptor(None, my_adaptor_impl, b=False)
a1 = my_adaptor("test", ts=count(schedule(timedelta(milliseconds=10), max_ticks=10)))
a2 = my_adaptor("test", ts=count(schedule(timedelta(milliseconds=11), max_ticks=10)))
result = combine(a1, a2)
stop_on_tsl_values(result, 11, 11)
return result
Characteristics:
Each client gets unique integer
ClientIdRequests combined as
TSD[ClientId, Request]Responses multiplexed as
TSD[ClientId, Response]
Sink-Only Adaptor:
@service_adaptor
def my_adaptor(path: str, ts: TS[int]):
"""Sink-only - no return value."""
@service_adaptor_impl(interfaces=my_adaptor)
def my_adaptor_impl(path: str, ts: TSD[int, TS[int]]):
log_(f"{path}: {{}}", ts)
6. Components
Components are reusable graph building blocks with record/replay support.
6.1 Definition
@component
def my_component(ts: TS[float], key: str) -> TS[float]:
"""A reusable computation component."""
return ts + 1.0
# Usage:
# eval_node(my_component, ts=[1.0, 2.0, 3.0], key="key_1") == [2.0, 3.0, 4.0]
6.2 With Dynamic Recordable ID
@component(recordable_id="Test_{key}")
def my_component(ts: TS[float], key: TS[str]) -> TS[float]:
"""Component with dynamic recordable ID based on key parameter."""
return ts + 1.0
# The recordable_id supports format strings from time-series string (TS[str]) parameters
6.3 Complete Record/Replay Example
from hgraph import (
component, graph, TS, TSD, map_, add_,
RecordReplayContext, RecordReplayEnum, set_record_replay_model,
IN_MEMORY, GlobalState, frozendict as fd
)
@component
def my_component(a: TSD[str, TS[float]], b: TSD[str, TS[float]]) -> TSD[str, TS[float]]:
"""Component that adds two TSDs element-wise."""
return map_(add_[TS[float]], a, b)
def test_record_replay():
with GlobalState() as gs:
set_record_replay_model(IN_MEMORY)
# Record execution
with RecordReplayContext(mode=RecordReplayEnum.RECORD):
result = eval_node(
my_component,
a=[fd(a=1.0, b=2.0), fd(a=2.0)],
b=[fd(a=3.0, b=2.0), fd(b=1.0)]
)
assert result == [fd(a=4.0, b=4.0), fd(a=5.0, b=3.0)]
# Replay recorded data
with RecordReplayContext(mode=RecordReplayEnum.REPLAY):
result = eval_node(my_component, a=[], b=[])
assert result == [fd(a=4.0, b=4.0), fd(a=5.0, b=3.0)]
6.4 Characteristics
Property |
Description |
|---|---|
Inputs |
Must have time-series inputs (required) |
Output |
Must have time-series output (required) |
Record/Replay |
Automatic wrapper for input/output recording |
References |
Inputs/outputs converted to reference form |
Nesting |
|
6.5 Record/Replay Integration
graph LR
IN[Inputs] --> IW[Input Wrapper]
IW --> |"records"| COMP[Component Logic]
COMP --> OW[Output Wrapper]
OW --> |"records"| OUT[Outputs]
REC[Record/Replay Context] -.-> IW
REC -.-> OW
7. Error Handling
7.1 try_except Pattern
Wraps graphs with exception catching:
from hgraph import try_except, TSB, TryExceptResult
result = try_except(risky_node, input_ts, __trace_back_depth__=2)
# result: TSB[TryExceptResult[output_type]]
7.2 Complete Working Examples
Basic try_except:
from hgraph import graph, TS, TSB, TryExceptResult, try_except, div_
@graph
def main(lhs: TS[float], rhs: TS[float]) -> TSB[TryExceptResult[TS[float]]]:
return try_except(div_[TS[float]], lhs, rhs)
result = eval_node(main, [1.0, 2.0, 3.0], [1.0, 2.0, 0.0])
# result[0] == {"out": 1.0}
# result[1] == {"out": 1.0}
# result[2].keys() == {"exception"} # Division by zero caught
try_except with Graph:
@graph
def error_prone(lhs: TS[float], rhs: TS[float]) -> TS[float]:
return lhs / rhs
@graph
def main(lhs: TS[float], rhs: TS[float]) -> TSB[TryExceptResult[TS[float]]]:
return try_except(error_prone, lhs, rhs)
result = eval_node(main, [1.0, 2.0], [2.0, 0.0])
# result[0]["out"] == 0.5
# result[1]["exception"] is not None
try_except with Map (Multi-client errors):
from hgraph import TryExceptTsdMapResult, REF
schema = ts_schema(out=TSD[int, REF[TS[float]]], error=TSD[int, TS[NodeError]])
@graph
def main(lhs: TSD[int, TS[float]], rhs: TSD[int, TS[float]]) -> TSB[TryExceptTsdMapResult[int, TSD[int, TS[float]]]]:
return try_except(map_, div_[TS[float]], lhs, rhs)
result = eval_node(main, [{0: 1.0}, {1: 2.0}, {2: 3.0}], [{0: 1.0}, {1: 2.0}, {2: 0.0}])
# result[0] == {"out": frozendict({0: 1.0})}
# result[1] == {"out": frozendict({1: 1.0})}
# result[2] contains "exception" for key 2
try_except with Sink Nodes:
from hgraph import sink_node, NodeError
@sink_node
def risky_sink(ts: TS[float]):
if ts.value == 2.0:
raise RuntimeError("Test error")
@graph
def main(ts: TS[float]) -> TS[NodeError]:
return try_except(risky_sink, ts)
result = eval_node(main, [1.0, 2.0])
# result[0] is None # No error
# result[1].error_msg.endswith("Test error") # Error captured
7.3 exception_time_series
Lightweight error extraction for single nodes:
from hgraph import exception_time_series, ts_schema, NodeError
schema = ts_schema(out=TS[float], error=TS[NodeError])
@graph
def main(lhs: TS[float], rhs: TS[float]) -> TSB[schema]:
out = lhs / rhs
return TSB[schema].from_ts(out=out, error=exception_time_series(out))
result = eval_node(main, [1.0, 2.0, 3.0], [1.0, 2.0, 0.0])
# result[0:2] == [{"out": 1.0}, {"out": 1.0}]
# result[2].keys() == {"error"}
7.4 Parameters
Parameter |
Description |
|---|---|
|
Stack frames to capture (default: 1) |
|
Capture input values in traceback (default: False) |
7.5 Exception Output by Node Type
Node Type |
Exception Output |
|---|---|
compute_node |
|
sink_node |
|
TSD map |
|
7.6 NodeError Type
class NodeError:
"""Contains exception information from node evaluation."""
exception_type: type
message: str
traceback: str
input_values: dict | None # If __capture_values__=True
8. Reference Locations
Concept |
Python Location |
|---|---|
@operator |
|
OperatorWiringNodeClass |
|
Generic Rank |
|
AUTO_RESOLVE |
|
Custom Resolvers |
|
@subscription_service |
|
@reference_service |
|
@request_reply_service |
|
@service_impl |
|
@adaptor |
|
@service_adaptor |
|
@component |
|
try_except |
|
NodeError |
|
9. Next Steps
Continue to:
09_CONTROL_FLOW.md - Control flow constructs
10_DATA_SOURCES.md - Data source patterns