# Part VIII: Advanced Concepts **Version:** 1.0 Draft **Last Updated:** 2025-12-20 --- ## 1. Introduction This document covers advanced HGraph concepts including: - Operator overloading and resolution - Custom type resolvers - Services (subscription, reference, request-reply) - Adaptors (single-client and multi-client) - Components - Error handling ```mermaid graph TD ADV[Advanced Concepts] ADV --> OVL[Operator Overloading] ADV --> RES[Custom Resolvers] ADV --> SVC[Services] ADV --> ADP[Adaptors] ADV --> CMP[Components] ADV --> ERR[Error Handling] SVC --> SUB[Subscription] SVC --> REF[Reference] SVC --> RR[Request-Reply] ADP --> SA[Single-Client] ADP --> MA[Multi-Client] ``` --- ## 2. Operator Overloading ### 2.1 The @operator Decorator The `@operator` decorator defines a polymorphic operation signature without implementation: ```python @operator def add_(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE_1) -> DEFAULT[OUT]: """This represents the ``+`` operator for time series types.""" ``` **Key Characteristics:** - Creates an `OperatorWiringNodeClass` instance - Function body is empty (`...` or `pass`) - Defines generic type parameters that implementations can specialize - Does NOT enforce exact signature matching (template-like behavior) ### 2.2 Registering Overloads Implementations register via the `overloads` parameter: ```python @compute_node(overloads=add_) def add_ints(lhs: TS[int], rhs: TS[int]) -> TS[int]: return lhs.value + rhs.value @compute_node(overloads=add_) def add_floats(lhs: TS[float], rhs: TS[float]) -> TS[float]: return lhs.value + rhs.value @compute_node(overloads=add_) def add_generic(lhs: TS[SCALAR], rhs: TS[SCALAR]) -> TS[SCALAR]: return lhs.value + rhs.value # Fallback for any scalar ``` ### 2.3 Complete Working Example ```python from hgraph import operator, compute_node, graph, TS, TIME_SERIES_TYPE, SIZE, TSL # Step 1: Define the operator signature @operator def add(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE: """Generic addition operator.""" ... # Step 2: Register type-specific overloads @compute_node(overloads=add) def add_default(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE: """Fallback implementation for any type.""" return lhs.value + rhs.value @compute_node(overloads=add) def add_ints(lhs: TS[int], rhs: TS[int]) -> TS[int]: """Specialized for integers - adds extra 1.""" return lhs.value + rhs.value + 1 @compute_node(overloads=add) def add_strs(lhs: TS[str], rhs: TS[str]) -> TS[str]: """Specialized for strings - adds suffix.""" return lhs.value + rhs.value + "~" @graph(overloads=add) def add_tsls(lhs: TSL[TIME_SERIES_TYPE, SIZE], rhs: TSL[TIME_SERIES_TYPE, SIZE]) -> TSL[TIME_SERIES_TYPE, SIZE]: """Specialized for time-series lists.""" return TSL.from_ts(*[a + b for a, b in zip(lhs, rhs)]) # Step 3: Use the operator - correct overload selected automatically @graph def test_add(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE) -> TIME_SERIES_TYPE: return add(lhs, rhs) # Results: # add(TS[int](1), TS[int](2)) -> 4 (uses add_ints: 1+2+1) # add(TS[float](1.0), TS[float](2.0)) -> 3.0 (uses add_default) # add(TS[str]("a"), TS[str]("b")) -> "ab~" (uses add_strs) ``` ### 2.4 Overload Resolution ```mermaid sequenceDiagram participant C as Caller participant O as Operator participant H as OverloadHelper participant I as Implementations C->>O: add_(TS[int], TS[int]) O->>H: _check_overloads() H->>I: Try resolve each overload I-->>H: Candidates with ranks H->>H: Sort by rank (ascending) H->>H: Select lowest rank H-->>O: Best match: add_ints O-->>C: WiringPort ``` **Resolution Algorithm:** 1. **Collect Candidates:** Try to resolve each overload's signature against provided arguments 2. **Calculate Ranks:** Each candidate receives a rank based on type specificity 3. **Select Best:** Choose the candidate with the lowest rank (most specific) | Outcome | Action | |---------|--------| | One clear winner | Use that overload | | Multiple same rank | Raise ambiguity error | | No candidates | Raise "no overload found" error | ### 2.5 Generic Rank System The rank quantifies type specificity. Lower rank = more specific = preferred. ```mermaid graph LR subgraph "Rank Examples" R0["TS[int] → rank ≈ 1e-10
(most specific)"] R1["TS[SCALAR] → rank ≈ 1.0"] R2["TIME_SERIES_TYPE → rank ≈ 1.0+
(least specific)"] end R0 --> |"preferred over"| R1 R1 --> |"preferred over"| R2 ``` **Rank Calculation:** ```python def _calc_rank(signature: WiringNodeSignature) -> float: if signature.node_type == WiringNodeType.OPERATOR: return 1e6 # Operators themselves have lowest priority ranks = [] for k, t in signature.input_types.items(): if signature.defaults.get(k) != AUTO_RESOLVE: if t.is_scalar: rank = scale_rank(t.generic_rank, 0.001) # Scalars scaled down elif k in (signature.var_arg, signature.var_kwarg): rank = scale_rank(t.generic_rank, 100.0) # Var args scaled up else: rank = t.generic_rank # Time-series get full rank ranks.append(rank) return sum(combine_ranks(ranks).values()) ``` **Type Rank Values:** | Type | Rank | Notes | |------|------|-------| | Concrete (`int`, `str`) | ~1e-10 | Very specific | | TypeVar (`SCALAR`) | ~1.0 | Generic | | `TIME_SERIES_TYPE` | ~1.0 | Generic | | Nested generics | Scaled | Combines inner ranks | ### 2.6 Real-World Example: Number Operators From `hgraph/_impl/_operators/_number_operators.py`: ```python from hgraph import add_, sub_, mul_, div_, compute_node, TS, NUMBER, NUMBER_2 from hgraph._operators._operators import DivideByZero @compute_node(overloads=add_) def add_float_to_int(lhs: TS[int], rhs: TS[float]) -> TS[float]: """Adds a timeseries of float to a timeseries of int.""" return lhs.value + rhs.value @compute_node(overloads=add_) def add_int_to_float(lhs: TS[float], rhs: TS[int]) -> TS[float]: """Adds a timeseries of int to a timeseries of float.""" return lhs.value + rhs.value @compute_node(overloads=div_) def div_numbers( lhs: TS[NUMBER], rhs: TS[NUMBER_2], divide_by_zero: DivideByZero = DivideByZero.ERROR ) -> TS[float]: """Divides numeric timeseries with configurable zero handling.""" try: return lhs.value / rhs.value except ZeroDivisionError: match divide_by_zero: case DivideByZero.NAN: return float("NaN") case DivideByZero.INF: return float("inf") case DivideByZero.NONE: return None # No output case DivideByZero.ZERO: return 0.0 case _: raise ``` ### 2.7 Python Operator Mapping Operators are mapped to Python special methods: ```python WiringPort.__add__ = lambda x, y: add_(x, y) WiringPort.__sub__ = lambda x, y: sub_(x, y) WiringPort.__mul__ = lambda x, y: mul_(x, y) # ... etc ``` ### 2.8 Import Requirements Overloads must be imported before use. The typical pattern: ```python # In package __init__.py from my_package._impl._operators import ( add_ints, add_floats, add_strings, # ... all overloads ) ``` --- ## 3. Custom Resolvers ### 3.1 AUTO_RESOLVE Sentinel `AUTO_RESOLVE` marks parameters that should be automatically resolved from input types: ```python AUTO_RESOLVE = object() # Sentinel value @compute_node def my_node(ts: TS[SCALAR], _tp: type[SCALAR] = AUTO_RESOLVE) -> TS[SCALAR]: # _tp will be automatically resolved to the actual SCALAR type return ts.value ``` **Usage Pattern:** - Used as default value for type parameters - Wiring system resolves actual type from TypeVar resolution - Common for extracting types from resolved inputs ### 3.2 Custom Resolver Functions The `resolvers` parameter provides custom type resolution logic: ```python @compute_node( resolvers={SCALAR: lambda mapping, attr: mapping[COMPOUND_SCALAR].meta_data_schema[attr].py_type} ) def getattr_cs(ts: TS[COMPOUND_SCALAR], attr: str, default: TS[SCALAR] = None) -> TS[SCALAR]: return getattr(ts.value, attr, default.value) ``` ### 3.3 Resolver Function Signatures **Style 1: Single Parameter (Mapping Only)** ```python lambda mapping: resolved_type ``` Resolver receives only the resolution mapping dictionary. **Style 2: Multiple Parameters (Mapping + Scalars)** ```python lambda mapping, param1, param2: resolved_type ``` Resolver receives mapping plus named scalar parameters from the function. ### 3.4 Resolution Process ```mermaid sequenceDiagram participant W as Wiring participant S as Signature participant R as Resolvers W->>S: build_resolution_dict() S->>S: Resolve input types S->>S: Build resolution mapping S->>R: Call custom resolvers Note over R: Resolver receives:
- mapping dict
- scalar params R-->>S: Resolved type S->>S: Validate resolved type S-->>W: Complete resolution ``` ### 3.5 Resolver Examples **Example 1: Attribute Type Resolution** From `hgraph/_impl/_operators/_getattr.py`: ```python @compute_node( overloads=getattr_, resolvers={SCALAR: lambda mapping, attr: mapping[COMPOUND_SCALAR].meta_data_schema[attr].py_type}, ) def getattr_cs(ts: TS[COMPOUND_SCALAR], attr: str, default_value: TS[SCALAR] = None) -> TS[SCALAR]: """Get attribute from compound scalar, resolving output type from schema.""" attr_value = getattr(ts.value, attr, default_value.value) return default_value.value if attr_value is None else attr_value @compute_node( overloads=setattr_, resolvers={SCALAR: lambda mapping, attr: mapping[COMPOUND_SCALAR].meta_data_schema[attr].py_type}, ) def setattr_cs(ts: TS[COMPOUND_SCALAR], attr: str, value: TS[SCALAR]) -> TS[COMPOUND_SCALAR]: """Set attribute on compound scalar.""" v = copy(ts.value) setattr(v, attr, value.value) return v ``` **Example 2: Tuple Element Type Resolution** From `hgraph/_impl/_operators/_tuple_operators.py`: ```python def _item_type(tuple_tp: Type[TUPLE], index: int) -> Type: """Resolve the type of a tuple element at given index.""" if isinstance(tuple_tp, HgTupleFixedScalarType): return tuple_tp.element_types[index] elif isinstance(tuple_tp, HgTupleCollectionScalarType): return tuple_tp.element_type raise IncorrectTypeBinding(TUPLE, tuple_tp) @compute_node( overloads=getitem_, resolvers={SCALAR: lambda mapping, key: _item_type(mapping[TUPLE], key)} ) def getitem_tuple_fixed(ts: TS[TUPLE], key: int) -> TS[SCALAR]: """Get element from tuple at index, with type resolution.""" return ts.value[key] ``` **Example 3: DataFrame Schema Resolution** From `hgraph/_impl/_operators/_data_frame_operators.py`: ```python def _cs_from_frame(mapping, df: pl.DataFrame) -> COMPOUND_SCALAR: """Create compound scalar type from DataFrame schema.""" schema = df.schema return compound_scalar(**{k: _convert_type(v) for k, v in schema.items()}) def _extract_scalar(mapping, df, value_col) -> SCALAR: """Extract scalar type from DataFrame column.""" schema = df.schema return _convert_type(schema[value_col]) @generator( overloads=from_data_frame, resolvers={SCALAR: _extract_scalar, COMPOUND_SCALAR: _cs_from_frame}, ) def from_data_frame_ts( df: Frame[COMPOUND_SCALAR], dt_col: str = "date", value_col: str = "value", _df_tp: type[COMPOUND_SCALAR] = AUTO_RESOLVE, ) -> TS[SCALAR]: """Generate time-series from DataFrame column.""" # ... implementation ``` **Example 4: Function Return Type Resolution** ```python @compute_node( valid=("fn",), resolvers={TIME_SERIES_TYPE: lambda m, fn: TS[fn.__annotations__["return"]]} ) def apply(fn: TS[Callable], *args: TSB[TS_SCHEMA]) -> DEFAULT[TIME_SERIES_TYPE]: """Apply callable, resolving output type from function annotation.""" fn_ = fn.value return fn_(*args) ``` ### 3.6 Unit Test Example ```python def test_func_resolve(): def x(x) -> str: return str(x) @compute_node( resolvers={SCALAR_1: lambda mapping, f: f.__annotations__["return"]} ) def call(ts: TS[SCALAR], f: type(x)) -> TS[SCALAR_1]: return f(ts.value) assert eval_node(call[SCALAR:int], [1, 2], f=x) == ["1", "2"] ``` --- ## 4. Services Services enable request-reply and publish-subscribe patterns within HGraph. ### 4.1 Service Types Overview ```mermaid graph TD SVC[Service Types] SVC --> SUB[Subscription Service] SVC --> REF[Reference Service] SVC --> RR[Request-Reply Service] SUB --> |"1:N streaming"| SUB_DESC[Client subscribes,
receives stream] REF --> |"Shared state"| REF_DESC[Single value,
all clients share] RR --> |"1:1 response"| RR_DESC[Request in,
response out] ``` ### 4.2 Subscription Service A streaming service where clients subscribe with keys and receive ongoing updates. **Definition:** ```python @subscription_service def my_subs_service(path: str, subscription: TS[str]) -> TS[str]: """The service description.""" ``` **Implementation:** ```python @service_impl(interfaces=my_subs_service) def my_subs_service_impl(subscription: TSS[str]) -> TSD[str, TS[str]]: """Implementation receives set of subscriptions, returns dict of results.""" return map_(pass_through_node, __keys__=subscription, __key_arg__="ts") ``` **Complete Working Example:** ```python from hgraph import ( subscription_service, service_impl, graph, register_service, default_path, TS, TSS, TSD, TSL, SIZE, map_, pass_through ) @subscription_service def my_subs_service(path: str, subscription: TS[str]) -> TS[str]: """Subscribe to receive string updates.""" @graph def subscription_instance(key: TS[str]) -> TS[str]: return key @service_impl(interfaces=my_subs_service) def my_subs_service_impl(subscription: TSS[str]) -> TSD[str, TS[str]]: return map_(pass_through_node, __keys__=subscription, __key_arg__="ts") @graph def main(sub1: TS[str], sub2: TS[str], sub3: TS[str]) -> TSL[TS[str], SIZE]: register_service(default_path, my_subs_service_impl) return TSL.from_ts( pass_through_node(my_subs_service(default_path, sub1)), pass_through_node(my_subs_service(default_path, sub2)), pass_through_node(my_subs_service(default_path, sub3)), ) # Usage: # eval_node(main, ["topic1", None, None], ["topic2", None, None], [None, None, "topic1"]) # Result: [None, {0: "topic1", 1: "topic2"}, {2: "topic1"}] ``` **Characteristics:** - Input: Individual `TS[T]` per client - Implementation receives: `TSS[T]` (set of subscriptions) - Implementation returns: `TSD[T, TS[Result]]` (keyed results) ### 4.3 Reference Service A service producing a single shared value independent of requesters. **Definition:** ```python @reference_service def my_service(path: str = None) -> TSD[str, TS[str]]: """The service description.""" ``` **Implementation:** ```python @service_impl(interfaces=my_service) def my_service_impl() -> TSD[str, TS[str]]: return const(frozendict({"test": "a value"}), TSD[str, TS[str]]) ``` **Complete Working Example:** ```python from hgraph import ( reference_service, service_impl, graph, register_service, default_path, TS, TSD, const, frozendict ) @reference_service def config_service(path: str = None) -> TSD[str, TS[str]]: """Get shared configuration.""" @service_impl(interfaces=config_service) def config_service_impl() -> TSD[str, TS[str]]: return const(frozendict({"api_key": "secret", "endpoint": "https://api.example.com"})) @graph def main() -> TS[str]: register_service(default_path, config_service_impl) config = config_service() return config["api_key"] # Usage: eval_node(main) == ["secret"] ``` **Characteristics:** - Single output shared by all clients - Output type automatically wrapped as `REF[TIME_SERIES_TYPE]` - Auto-binds to `ref_svc://.` if no path provided ### 4.4 Request-Reply Service A synchronous request-response pattern with timeout handling. **Definition:** ```python @request_reply_service def add_one_service(path: str, ts: TS[int]) -> TS[int]: """The service description.""" ``` **Implementation:** ```python @service_impl(interfaces=add_one_service) def add_one_service_impl(ts: TSD[int, TS[int]]) -> TSD[int, TS[int]]: return map_(lambda x: x + 1, ts) ``` **Complete Working Example:** ```python from hgraph import ( request_reply_service, service_impl, graph, register_service, default_path, TS, TSD, map_ ) @request_reply_service def add_service(path: str, ts: TS[int], ts1: TS[int]) -> TS[int]: """Add two numbers via service.""" @service_impl(interfaces=add_service) def add_service_impl(ts: TSD[int, TS[int]], ts1: TSD[int, TS[int]]) -> TSD[int, TS[int]]: return map_(lambda x, y: x + y, ts, ts1) @graph def main(x: TS[int], y: TS[int]) -> TS[int]: register_service(default_path, add_service_impl) return add_service(default_path, x, y) # Usage: eval_node(main, [1], [2]) == [None, None, 3] # Note: Two None ticks for service setup latency ``` **Response Type:** ```python TSB[ReqRepResponse[TIME_SERIES_TYPE_1]] # Contains: # result: TIME_SERIES_TYPE_1 # time_out: TS[bool] # error: TS[str] ``` ### 4.5 Multi-Service Implementation A single implementation can serve multiple service interfaces: ```python from hgraph import ( request_reply_service, reference_service, subscription_service, service_impl, get_service_inputs, set_service_output ) @request_reply_service def submit(path: str, ts: TS[int]): ... @reference_service def receive(path: str) -> TSS[int]: ... @subscription_service def subscribe(path: str, ts: TS[int]) -> TS[bool]: ... @service_impl(interfaces=(submit, receive, subscribe)) def impl(path: str): """Single implementation handles all three service interfaces.""" submissions: TSD[int, TS[int]] = get_service_inputs(path, submit).ts items = flip(submissions).key_set set_service_output(path, receive, items) set_service_output( path, subscribe, map_( lambda key, i: contains_(i, key), __keys__=subscribe.wire_impl_inputs_stub(path).ts, i=pass_through_node(items), ), ) ``` **Helper Functions:** | Function | Purpose | |----------|---------| | `get_service_inputs(path, service)` | Retrieve the combined inputs for a service interface (returns struct with input fields) | | `set_service_output(path, service, output)` | Set the output for a service interface to broadcast to clients | ### 4.6 Service Registration ```python from hgraph import register_service, default_path # Register implementation for a path register_service("my_path", my_impl, resolution_dict={...}) # Use default path register_service(default_path, my_impl) ``` **Path Format:** `:///` --- ## 5. Adaptors Adaptors provide bidirectional connectivity between HGraph and external systems. ### 5.1 Single-Client Adaptor For point-to-point external communication. **Interface Definition:** ```python @adaptor def my_adaptor(path: str, ts: TS[int]) -> TS[int]: """Send data out, receive data in.""" ``` **Implementation:** ```python @adaptor_impl(interfaces=my_adaptor) def my_adaptor_impl(path: str, ts: TS[int]) -> TS[int]: queue_sink(path, ts) # Send to external return queue_source(path) # Receive from external ``` **Complete Working Example:** ```python from hgraph import ( adaptor, adaptor_impl, graph, register_adaptor, TS, queue_sink, queue_source, count, schedule, stop_on_value, evaluate_graph, GraphConfiguration, EvaluationMode ) from datetime import timedelta @adaptor def my_adaptor(path: str, ts: TS[int]) -> TS[int]: """Echo adaptor - sends and receives integers.""" @adaptor_impl(interfaces=my_adaptor) def my_adaptor_impl(path: str, ts: TS[int]) -> TS[int]: queue_sink(path, ts) return queue_source(path) @graph def g() -> TS[int]: register_adaptor("test_adaptor", my_adaptor_impl) result = my_adaptor( "test_adaptor", count(schedule(timedelta(milliseconds=10), max_ticks=10)) ) stop_on_value(result, 10) return result # Execute in real-time mode result = evaluate_graph( g, GraphConfiguration( run_mode=EvaluationMode.REAL_TIME, end_time=timedelta(milliseconds=1000) ) ) # result: [(t1, 1), (t2, 2), ..., (t10, 10)] ``` **With Parameters:** ```python @adaptor def my_adaptor(path: str, b: bool, ts: TS[int]) -> TS[int]: ... @adaptor_impl(interfaces=my_adaptor) def my_adaptor_impl(path: str, b: bool, ts: TS[int]) -> TS[int]: path = f"{path}_{b}" # Use parameter to customize path queue_sink(path, ts if b else ts + 1) return queue_source(path) ``` ### 5.2 Multi-Client Service Adaptor For multiple concurrent client connections. **Interface Definition:** ```python @service_adaptor def my_adaptor(path: str, ts: TS[int]) -> TS[int]: """Handle multiple client requests.""" ``` **Implementation:** ```python @service_adaptor_impl(interfaces=my_adaptor) def my_adaptor_impl(path: str, ts: TSD[int, TS[int]]) -> TSD[int, TS[int]]: """Implementation receives multiplexed by ClientId.""" tsd_queue_sink(path, ts) return tsd_queue_source(path) ``` **Complete Working Example:** ```python from hgraph import ( service_adaptor, service_adaptor_impl, graph, register_adaptor, TS, TSD, TSL, Size, tsd_queue_sink, tsd_queue_source, map_, count, schedule, combine, stop_on_tsl_values, evaluate_graph ) from datetime import timedelta @service_adaptor def my_adaptor(path: str, ts: TS[int]) -> TS[int]: ... @service_adaptor_impl(interfaces=my_adaptor) def my_adaptor_impl(path: str, b: bool, ts: TSD[int, TS[int]]) -> TSD[int, TS[int]]: path = f"{path}_{b}" tsd_queue_sink(path, ts if b else map_(lambda x: x + 1, ts)) return tsd_queue_source(path) @graph def g() -> TSL[TS[int], Size[2]]: register_adaptor(None, my_adaptor_impl, b=False) a1 = my_adaptor("test", ts=count(schedule(timedelta(milliseconds=10), max_ticks=10))) a2 = my_adaptor("test", ts=count(schedule(timedelta(milliseconds=11), max_ticks=10))) result = combine(a1, a2) stop_on_tsl_values(result, 11, 11) return result ``` **Characteristics:** - Each client gets unique integer `ClientId` - Requests combined as `TSD[ClientId, Request]` - Responses multiplexed as `TSD[ClientId, Response]` **Sink-Only Adaptor:** ```python @service_adaptor def my_adaptor(path: str, ts: TS[int]): """Sink-only - no return value.""" @service_adaptor_impl(interfaces=my_adaptor) def my_adaptor_impl(path: str, ts: TSD[int, TS[int]]): log_(f"{path}: {{}}", ts) ``` --- ## 6. Components Components are reusable graph building blocks with record/replay support. ### 6.1 Definition ```python @component def my_component(ts: TS[float], key: str) -> TS[float]: """A reusable computation component.""" return ts + 1.0 # Usage: # eval_node(my_component, ts=[1.0, 2.0, 3.0], key="key_1") == [2.0, 3.0, 4.0] ``` ### 6.2 With Dynamic Recordable ID ```python @component(recordable_id="Test_{key}") def my_component(ts: TS[float], key: TS[str]) -> TS[float]: """Component with dynamic recordable ID based on key parameter.""" return ts + 1.0 # The recordable_id supports format strings from time-series string (TS[str]) parameters ``` ### 6.3 Complete Record/Replay Example ```python from hgraph import ( component, graph, TS, TSD, map_, add_, RecordReplayContext, RecordReplayEnum, set_record_replay_model, IN_MEMORY, GlobalState, frozendict as fd ) @component def my_component(a: TSD[str, TS[float]], b: TSD[str, TS[float]]) -> TSD[str, TS[float]]: """Component that adds two TSDs element-wise.""" return map_(add_[TS[float]], a, b) def test_record_replay(): with GlobalState() as gs: set_record_replay_model(IN_MEMORY) # Record execution with RecordReplayContext(mode=RecordReplayEnum.RECORD): result = eval_node( my_component, a=[fd(a=1.0, b=2.0), fd(a=2.0)], b=[fd(a=3.0, b=2.0), fd(b=1.0)] ) assert result == [fd(a=4.0, b=4.0), fd(a=5.0, b=3.0)] # Replay recorded data with RecordReplayContext(mode=RecordReplayEnum.REPLAY): result = eval_node(my_component, a=[], b=[]) assert result == [fd(a=4.0, b=4.0), fd(a=5.0, b=3.0)] ``` ### 6.4 Characteristics | Property | Description | |----------|-------------| | Inputs | Must have time-series inputs (required) | | Output | Must have time-series output (required) | | Record/Replay | Automatic wrapper for input/output recording | | References | Inputs/outputs converted to reference form | | Nesting | `has_nested_graphs = True` | ### 6.5 Record/Replay Integration ```mermaid graph LR IN[Inputs] --> IW[Input Wrapper] IW --> |"records"| COMP[Component Logic] COMP --> OW[Output Wrapper] OW --> |"records"| OUT[Outputs] REC[Record/Replay Context] -.-> IW REC -.-> OW ``` --- ## 7. Error Handling ### 7.1 try_except Pattern Wraps graphs with exception catching: ```python from hgraph import try_except, TSB, TryExceptResult result = try_except(risky_node, input_ts, __trace_back_depth__=2) # result: TSB[TryExceptResult[output_type]] ``` ### 7.2 Complete Working Examples **Basic try_except:** ```python from hgraph import graph, TS, TSB, TryExceptResult, try_except, div_ @graph def main(lhs: TS[float], rhs: TS[float]) -> TSB[TryExceptResult[TS[float]]]: return try_except(div_[TS[float]], lhs, rhs) result = eval_node(main, [1.0, 2.0, 3.0], [1.0, 2.0, 0.0]) # result[0] == {"out": 1.0} # result[1] == {"out": 1.0} # result[2].keys() == {"exception"} # Division by zero caught ``` **try_except with Graph:** ```python @graph def error_prone(lhs: TS[float], rhs: TS[float]) -> TS[float]: return lhs / rhs @graph def main(lhs: TS[float], rhs: TS[float]) -> TSB[TryExceptResult[TS[float]]]: return try_except(error_prone, lhs, rhs) result = eval_node(main, [1.0, 2.0], [2.0, 0.0]) # result[0]["out"] == 0.5 # result[1]["exception"] is not None ``` **try_except with Map (Multi-client errors):** ```python from hgraph import TryExceptTsdMapResult, REF schema = ts_schema(out=TSD[int, REF[TS[float]]], error=TSD[int, TS[NodeError]]) @graph def main(lhs: TSD[int, TS[float]], rhs: TSD[int, TS[float]]) -> TSB[TryExceptTsdMapResult[int, TSD[int, TS[float]]]]: return try_except(map_, div_[TS[float]], lhs, rhs) result = eval_node(main, [{0: 1.0}, {1: 2.0}, {2: 3.0}], [{0: 1.0}, {1: 2.0}, {2: 0.0}]) # result[0] == {"out": frozendict({0: 1.0})} # result[1] == {"out": frozendict({1: 1.0})} # result[2] contains "exception" for key 2 ``` **try_except with Sink Nodes:** ```python from hgraph import sink_node, NodeError @sink_node def risky_sink(ts: TS[float]): if ts.value == 2.0: raise RuntimeError("Test error") @graph def main(ts: TS[float]) -> TS[NodeError]: return try_except(risky_sink, ts) result = eval_node(main, [1.0, 2.0]) # result[0] is None # No error # result[1].error_msg.endswith("Test error") # Error captured ``` ### 7.3 exception_time_series Lightweight error extraction for single nodes: ```python from hgraph import exception_time_series, ts_schema, NodeError schema = ts_schema(out=TS[float], error=TS[NodeError]) @graph def main(lhs: TS[float], rhs: TS[float]) -> TSB[schema]: out = lhs / rhs return TSB[schema].from_ts(out=out, error=exception_time_series(out)) result = eval_node(main, [1.0, 2.0, 3.0], [1.0, 2.0, 0.0]) # result[0:2] == [{"out": 1.0}, {"out": 1.0}] # result[2].keys() == {"error"} ``` ### 7.4 Parameters | Parameter | Description | |-----------|-------------| | `__trace_back_depth__` | Stack frames to capture (default: 1) | | `__capture_values__` | Capture input values in traceback (default: False) | ### 7.5 Exception Output by Node Type | Node Type | Exception Output | |-----------|------------------| | compute_node | `TSB[TryExceptResult[output_type]]` | | sink_node | `TS[NodeError]` | | TSD map | `TSB[TryExceptTsdMapResult]` | ### 7.6 NodeError Type ```python class NodeError: """Contains exception information from node evaluation.""" exception_type: type message: str traceback: str input_values: dict | None # If __capture_values__=True ``` --- ## 8. Reference Locations | Concept | Python Location | |---------|-----------------| | @operator | `hgraph/_wiring/_decorators.py` | | OperatorWiringNodeClass | `hgraph/_wiring/_wiring_node_class/_operator_wiring_node.py` | | Generic Rank | `hgraph/_types/_generic_rank_util.py` | | AUTO_RESOLVE | `hgraph/_types/_type_meta_data.py` | | Custom Resolvers | `hgraph/_wiring/_wiring_node_signature.py` | | @subscription_service | `hgraph/_wiring/_decorators.py` | | @reference_service | `hgraph/_wiring/_decorators.py` | | @request_reply_service | `hgraph/_wiring/_decorators.py` | | @service_impl | `hgraph/_wiring/_decorators.py` | | @adaptor | `hgraph/_wiring/_decorators.py` | | @service_adaptor | `hgraph/_wiring/_decorators.py` | | @component | `hgraph/_wiring/_wiring_node_class/_component_node_class.py` | | try_except | `hgraph/_wiring/_exception_handling.py` | | NodeError | `hgraph/_types/_error_type.py` | --- ## 9. Next Steps Continue to: - [09_CONTROL_FLOW.md](09_CONTROL_FLOW.md) - Control flow constructs - [10_DATA_SOURCES.md](10_DATA_SOURCES.md) - Data source patterns