Part X: Data Sources and Record/Replay
Version: 1.0 Draft Last Updated: 2025-12-20
1. Introduction
HGraph provides various mechanisms for data ingestion:
graph TD
DS[Data Sources]
DS --> PULL[Pull Sources]
DS --> PUSH[Push Sources]
DS --> RR[Record/Replay]
PULL --> PS[pull_source_node]
PULL --> GEN[generator]
PULL --> CF[const_fn]
PUSH --> PSN[push_source_node]
PUSH --> PQ[push_queue]
RR --> REC[record]
RR --> REP[replay]
RR --> CMP[compare]
2. Pull Source Nodes
2.1 @pull_source_node Decorator
Pull sources generate data on demand, typically on a schedule. This is the base decorator for pull-based source nodes:
@pull_source_node
def clock_ticks(period: timedelta = timedelta(seconds=1)) -> TS[datetime]:
"""Emit current time periodically."""
return datetime.now()
Implementation Pattern:
# From hgraph/_wiring/_decorators.py
def pull_source_node(
fn: SOURCE_NODE_SIGNATURE = None,
/,
node_impl=None,
resolvers: Mapping["TypeVar", Callable] = None,
requires: Callable[[..., ...], bool] = None,
label: str | None = None,
deprecated: bool | str = False,
) -> SOURCE_NODE_SIGNATURE:
"""
Used to indicate the signature for a source node.
For Python source nodes use either generator or source_adapter annotations.
"""
2.2 Scheduling Pattern
sequenceDiagram
participant S as Scheduler
participant N as Pull Source
participant O as Output
N->>S: start(): schedule(now + period)
S->>N: eval() at scheduled time
N->>N: Generate value
N->>O: Write output
N->>S: schedule(now + period)
Note over S,N: Cycle continues
2.3 With Scheduler Access
Scheduler injection allows nodes to control their own timing:
# From hgraph_unit_tests/_runtime/test_scheduler.py
from hgraph import compute_node, TS, SCHEDULER, MIN_TD
@compute_node
def my_scheduler(ts: TS[int], tag: str = None, _scheduler: SCHEDULER = None) -> TS[int]:
if ts.modified:
_scheduler.schedule(MIN_TD * ts.value, tag)
return ts.value
if _scheduler.is_scheduled_now:
return -1
# Scheduler with wall clock for real-time mode
@compute_node
def my_scheduler_realtime(ts: TS[int], tag: str = None, _scheduler: SCHEDULER = None) -> TS[int]:
if ts.modified:
_scheduler.schedule(MIN_TD * ts.value, tag, on_wall_clock=True)
return ts.value
if _scheduler.is_scheduled_now:
return -1
2.4 Context Wiring Pattern
# From hgraph/_wiring/_context_wiring.py
@pull_source_node(node_impl=ContextNodeClass)
def get_context_output(path: str, depth: int) -> REF[CONTEXT_TIME_SERIES_TYPE]:
"""Uses the special node to extract a context output from the global state."""
2.5 Characteristics
Property |
Value |
|---|---|
Inputs |
Scalars only (no time-series) |
Output |
Required |
Evaluation |
Self-scheduled |
Mode |
Both simulation and real-time |
3. Push Source Nodes
3.1 @push_source_node Decorator
Push sources receive asynchronous external events:
@push_source_node
def websocket_events() -> TS[str]:
"""Receive websocket messages."""
...
3.2 Sender Pattern
sequenceDiagram
participant E as External Thread
participant S as Sender
participant Q as Event Queue
participant EN as Engine
participant N as Push Source
N->>S: start() returns sender
E->>S: send(value)
S->>Q: Enqueue event
Q->>EN: Signal new event
EN->>N: eval()
N->>N: Retrieve value from sender
N->>N: Write to output
3.3 Characteristics
Property |
Value |
|---|---|
Inputs |
Scalars only (no time-series) |
Output |
Required |
Evaluation |
Triggered by external event |
Mode |
Real-time only (breaks simulation determinism) |
4. Generator
4.1 @generator Decorator
Generators create pull sources from Python generator functions. This is the most common way to implement pull source nodes:
@generator
def signal() -> TS[bool]:
while True:
yield (timedelta(milliseconds=1), True)
4.2 Yield Format
Generator yields tuples of (time_delta_or_datetime, value):
# Yield with timedelta (relative time)
@generator
def countdown(start: int) -> TS[int]:
for i in range(start, 0, -1):
yield (timedelta(seconds=1), i)
yield (timedelta(seconds=1), 0)
# Generator ends, source stops
# Yield with datetime (absolute time)
@generator
def timed_events(_clock: EvaluationClock = None) -> TS[str]:
for i in range(5):
yield _clock.next_cycle_evaluation_time, f"event_{i}"
4.3 Real-World Examples
Example 1: const() Implementation
# From hgraph/_impl/_operators/_time_series_conversion.py
@generator(overloads=const)
def const_default(
value: SCALAR,
tp: type[OUT] = TS[SCALAR],
delay: timedelta = timedelta(),
_api: EvaluationEngineApi = None,
) -> DEFAULT[OUT]:
"""
Produces a single tick at the start of the graph evaluation.
:param value: The value to emit
:param tp: Output type, defaults to TS[SCALAR]
:param delay: Amount of time to delay the value
:param _api: Evaluation API (injected)
:return: A single tick of the value supplied
"""
yield _api.start_time + delay, value
Example 2: nothing() Implementation
# From hgraph/_impl/_operators/_graph_operators.py
@generator(overloads=nothing)
def nothing_impl(tp: type[OUT] = AUTO_RESOLVE) -> DEFAULT[OUT]:
"""
Produces no ticks ever.
"""
yield from ()
Example 3: schedule() Implementation
# From hgraph/_impl/_operators/_stream_operators.py
@generator(overloads=schedule)
def schedule_scalar(delay: timedelta, initial_delay: bool = True, max_ticks: int = sys.maxsize) -> TS[bool]:
"""Emit True at regular intervals."""
initial_timedelta = delay if initial_delay else timedelta()
yield (initial_timedelta, True)
for _ in range(max_ticks - 1):
yield (delay, True)
Example 4: Generator with Resolvers (DataFrame Integration)
# From hgraph/adaptors/data_frame/_data_source_generators.py
@generator(resolvers={SCALAR: _extract_scalar})
def ts_from_data_source(
dfs: type[DATA_FRAME_SOURCE],
dt_col: str = "date",
value_col: str = "value",
offset: timedelta = timedelta(),
_api: EvaluationEngineApi = None,
) -> TS[SCALAR]:
"""Stream values from a DataFrame source."""
df: pl.DataFrame
dfs_instance = DataStore.instance().get_data_source(dfs)
dt_converter = _dt_converter(dfs_instance.schema[dt_col])
for df in dfs_instance.iter_frames(start_time=_api.start_time, end_time=_api.end_time):
if df.is_empty():
continue
for dt, value in df.select([dt_col, value_col]).iter_rows(named=False):
dt = dt_converter(dt)
yield dt + offset, value
Example 5: Generator with TSB (Bundle) Output
# From hgraph/adaptors/data_frame/_data_source_generators.py
@generator(resolvers={TS_SCHEMA: _extract_schema})
def tsb_from_data_source(
dfs: type[DATA_FRAME_SOURCE],
dt_col: str,
offset: timedelta = timedelta(),
_api: EvaluationEngineApi = None
) -> TSB[TS_SCHEMA]:
"""Stream rows from DataFrame as bundles."""
dfs_instance = DataStore.instance().get_data_source(dfs)
dt_converter = _dt_converter(dfs_instance.schema[dt_col])
for df in dfs_instance.iter_frames(start_time=_api.start_time, end_time=_api.end_time):
for value in df.iter_rows(named=True):
dt = dt_converter(value.pop(dt_col))
yield dt + offset, value
Example 6: Generator in Service Implementation
# From hgraph_unit_tests/_wiring/test_service.py
@service_impl(interfaces=(data,))
@generator
def impl() -> TS[str]:
yield MIN_ST, "Test"
4.4 Execution Model
sequenceDiagram
participant G as Generator
participant R as Runtime
participant O as Output
G->>G: Initialize generator
loop Until StopIteration
G->>R: yield (delta, value)
R->>R: Schedule: now + delta
R->>O: Write value
R->>R: Wait for scheduled time
R->>G: Resume generator
end
G->>R: StopIteration
Note over R: Generator complete
4.5 Injectable Parameters
Generators can inject runtime objects:
Parameter |
Type |
Description |
|---|---|---|
|
|
Access to start/end time |
|
|
Access to current evaluation time |
|
|
Node metadata |
|
|
Stateful storage |
4.6 Error Handling
# Duplicate time yields raise errors
@generator
def play_duplicate() -> TS[int]:
yield MIN_ST, 1
yield MIN_ST, 2 # Raises NodeException - duplicate time not allowed
5. Push Queue
5.1 @push_queue Decorator
Push queues enable asynchronous external data injection with thread-safe semantics:
@push_queue(TS[str])
def my_message_sender(sender: Callable[[str], None], values: tuple[str, ...]):
"""
The sender callable is provided by the runtime.
Call sender(value) from any thread to inject data into the graph.
"""
def _sender(values):
for value in values:
sender(value)
time.sleep(0.1)
threading.Thread(target=_sender, args=(values,)).start()
5.2 Real-World Examples
Example 1: Interactive User Input
# From docs_md/quick_start/push_source_node.py
import sys
import threading
from typing import Callable
from datetime import timedelta
from hgraph import push_queue, TS, graph, evaluate_graph, GraphConfiguration, EvaluationMode, debug_print, if_true, stop_engine
def _user_input(sender: Callable[[str], None]):
while True:
s = sys.stdin.readline().strip("\n")
sender(s)
if s == "exit":
break
@push_queue(TS[str])
def user_input(sender: Callable[[str], None]):
threading.Thread(target=_user_input, args=(sender,)).start()
@graph
def main():
in_ = user_input()
debug_print(">", in_)
stop_engine(if_true(in_ == "exit"))
evaluate_graph(main, GraphConfiguration(run_mode=EvaluationMode.REAL_TIME, end_time=timedelta(minutes=2)))
Example 2: Basic String Messages
# From hgraph_unit_tests/nodes/test_push_queue.py
def test_push_queue():
def _sender(sender: Callable[[str], None], values: [str]):
for value in values:
sender(value)
time.sleep(0.1)
@push_queue(TS[str])
def my_message_sender(sender: Callable[[str], None], values: tuple[str, ...]):
threading.Thread(target=_sender, args=(sender, values)).start()
@graph
def main():
messages = my_message_sender(("1", "2", "3"))
record(messages)
stop_engine(if_true(messages == const("3")), "Completed Processing request")
with GlobalState():
evaluate_graph(main, GraphConfiguration(run_mode=EvaluationMode.REAL_TIME, end_time=timedelta(seconds=1)))
values = get_recorded_value()
assert [v[1] for v in values] == ["1", "2", "3"]
5.3 Batch Mode
Collect multiple values into tuples before sending:
# From hgraph_unit_tests/nodes/test_push_queue.py
def test_batch_push_queue():
def _sender(sender: Callable[[str], None], values: [str]):
for value in values:
sender(value)
@push_queue(TS[Tuple[str, ...]])
def my_message_sender(sender: Callable[[str], None], values: tuple[str, ...], batch: bool = True):
threading.Thread(target=_sender, args=(sender, values)).start()
@graph
def main():
messages = my_message_sender(("1", "2", "3"))
record(messages)
stop_engine(if_true(contains_(messages, const("3"))))
with GlobalState():
evaluate_graph(main, GraphConfiguration(run_mode=EvaluationMode.REAL_TIME, end_time=timedelta(seconds=1)))
values = get_recorded_value()
# All values batched together in one tuple
assert [v[1] for v in values] == [("1", "2", "3")]
5.4 Elide Mode
Skip intermediate values, only forward final state changes:
# From hgraph_unit_tests/nodes/test_push_queue.py
def test_elide_push_queue():
def _sender(sender: Callable[[str], None], values: [str]):
for value in values:
sender(value)
@push_queue(TS[str])
def my_message_sender(sender: Callable[[str], None], values: tuple[str, ...], elide: bool = True):
threading.Thread(target=_sender, args=(sender, values)).start()
@graph
def main():
messages = my_message_sender(("1", "2", "3"))
record(messages)
stop_engine(if_true(messages == const("3")))
with GlobalState():
evaluate_graph(main, GraphConfiguration(run_mode=EvaluationMode.REAL_TIME, end_time=timedelta(seconds=1)))
values = get_recorded_value()
# Only final value recorded (intermediate values elided)
assert [v[1] for v in values] == ["3"]
5.5 TSD (Time Series Dictionary) Queue
For multi-keyed data streams:
# From hgraph_unit_tests/nodes/test_push_queue.py
def test_tsd_push_queue():
def _sender(sender: Callable[[str, float], None], values: Tuple[dict[str, float]]):
for value in values:
sender(value)
time.sleep(0.01)
@push_queue(TSD[str, TS[float]])
def my_message_sender(sender: Callable[[str, float], None], values: tuple[dict[str, float], ...]):
threading.Thread(target=_sender, args=(sender, values)).start()
@graph
def main():
messages = my_message_sender(({"a": 1.0}, {"b": 2.0}, {"c": 3.0}, {"a": REMOVE}, {'c': 4.0}))
record(messages)
stop_engine(if_true(messages['c'] == 4.0))
with GlobalState():
evaluate_graph(main, GraphConfiguration(run_mode=EvaluationMode.REAL_TIME, end_time=timedelta(seconds=1)))
values = get_recorded_value()
assert [v[1] for v in values] == [{"a": 1.0}, {"b": 2.0}, {"c": 3.0}, {"a": REMOVE}, {'c': 4.0}]
5.6 Integration Patterns
Kafka Integration:
# From hgraph/adaptors/kafka/_impl.py
from kafka import KafkaConsumer
from typing import Callable
from threading import Thread, Event
@push_queue(TS[bytes])
def _message_subscriber_queue(sender: Callable[[SCALAR], None] = None, *, topic: str):
KafkaMessageState.instance().set_subscriber_sender(topic, sender)
class KafkaConsumerThread(Thread):
def __init__(self, topic, consumer: KafkaConsumer, sender: Callable[[bytes], None]):
super().__init__()
self.topic = topic
self.consumer = consumer
self.sender = sender
self._stop_event = Event()
def run(self):
try:
while not self._stop_event.is_set():
records = self.consumer.poll(timeout_ms=1000, max_records=1000)
all_messages = [m for tp, messages in records.items() for m in messages]
if len(records) > 1:
all_messages = sorted(all_messages, key=lambda m: (m.timestamp, m.topic, m.offset))
for msg in all_messages:
self.sender(msg.value) # Inject into graph
except Exception:
error(f"Failure in Kafka on topic: {self.topic}", exc_info=True)
finally:
self.consumer.close()
def stop(self):
self._stop_event.set()
WebSocket Server Integration:
# From hgraph/adaptors/tornado/websocket_server_adaptor.py
@push_queue(TSD[int, TS[WebSocketConnectRequest]])
def connections_from_web(
sender, path: str = "tornado_websocket_server_adaptor", elide: bool = True
) -> TSD[int, TS[WebSocketConnectRequest]]:
GlobalState.instance()[f"websocket_server_adaptor://{path}/connect_queue"] = sender
return None
@push_queue(TSD[int, TS[tuple[STR_OR_BYTES, ...]]])
def messages_from_web(
sender, path: str = "tornado_websocket_server_adaptor", batch: bool = True
) -> TSD[int, TS[tuple[bytes, ...]]]:
GlobalState.instance()[f"websocket_server_adaptor://{path}/message_queue"] = sender
return None
5.7 Parameters
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
Required |
Output type specification |
|
|
|
Collect values into tuples |
|
|
|
Skip intermediate updates |
6. const_fn
6.1 Overview
const_fn wraps pure scalar functions as time-series sources with dual usage:
@const_fn
def calculate(a: int, b: int) -> TS[int]:
return a + b
6.2 Real-World Examples
Example 1: Basic Usage
# From hgraph_unit_tests/_wiring/_test_const_fn.py
from hgraph import const_fn, TS, graph, const
from hgraph.test import eval_node
def test_const_fn_as_ts():
@const_fn
def my_const_fn(a: int, b: int) -> TS[int]:
return a + b
assert eval_node(my_const_fn, 1, 2) == [3]
def test_const_fn_as_scalar():
@const_fn
def my_const_fn(a: int, b: int) -> TS[int]:
return a + b
# Outside graph: returns scalar directly
assert my_const_fn(1, 2) == 3
Example 2: Value Access in Graph
# From hgraph_unit_tests/_wiring/_test_const_fn.py
def test_const_fn_value_in_graph():
@const_fn
def my_const_fn(a: int, b: int) -> TS[int]:
return a + b
@graph
def my_graph() -> TS[bool]:
# Access .value during wiring for conditional logic
if my_const_fn(1, 2).value == 3:
return const(True)
else:
return const(False)
assert eval_node(my_graph) == [True]
Example 3: Operator Overloading
# From hgraph_unit_tests/_wiring/_test_const_fn.py
from hgraph import operator, TIME_SERIES_TYPE
def test_const_fn_resolution():
@operator
def my_const_operator(tp: type[TIME_SERIES_TYPE]) -> TS[str]: ...
@const_fn(overloads=my_const_operator)
def my_const_fn_int(tp: type[TS[int]]) -> TS[str]:
return "int"
@const_fn(overloads=my_const_operator)
def my_const_fn_float(tp: type[TS[float]]) -> TS[str]:
return "float"
assert my_const_operator(TS[int]) == "int"
assert my_const_operator(TS[float]) == "float"
6.3 Characteristics
Property |
Description |
|---|---|
Return |
Constant |
Evaluation |
Once at wiring time |
Context |
Can be used both in and outside graphs |
Overloads |
Supports |
7. Record/Replay System
7.1 Overview
The record/replay system enables:
Recording graph execution for later analysis
Replaying recorded data for testing
Comparing recorded vs live execution (back-testing)
graph LR
subgraph "Modes"
REC[RECORD]
REP[REPLAY]
CMP[COMPARE]
RST[RESET]
RCV[RECOVER]
end
REC --> |"Capture"| STORAGE[Storage]
REP --> |"Read"| STORAGE
CMP --> |"Validate"| STORAGE
7.2 API Definition
# From hgraph/_operators/_record_replay.py
from enum import IntFlag, auto
class RecordReplayEnum(IntFlag):
"""Enum for record/replay modes"""
NONE = 0
RECORD = auto() # Records the recordable components
REPLAY = auto() # Replays the inputs
COMPARE = auto() # Back-testing mode - replays and compares outputs
REPLAY_OUTPUT = auto() # Replays outputs until last, then continues
RESET = auto() # Re-record ignoring current state
RECOVER = auto() # Recover state from recorded data and continue
class RecordReplayContext:
"""Context manager for record/replay operations"""
def __init__(self, mode: RecordReplayEnum = RecordReplayEnum.RECORD, recordable_id: str = None):
self._mode = mode
self._recordable_id = recordable_id
def __enter__(self) -> "RecordReplayContext":
self._instance.append(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._instance.pop()
# Core operators
@operator
def record(ts: DEFAULT[TIME_SERIES_TYPE], key: str, recordable_id: str = None):
"""Record the ts value tied to recordable_id + key"""
...
@operator
def replay(key: str, tp: type[OUT] = AUTO_RESOLVE, recordable_id: str = None) -> OUT:
"""Replay the ts using the id provided in the context"""
@operator
def replay_const(key: str, tp: type[OUT] = AUTO_RESOLVE, recordable_id: str = None,
tm: datetime = None, as_of: datetime = None) -> OUT:
"""Return const time-series of values <= start_time for initialization"""
@operator
def compare(lhs: TIME_SERIES_TYPE, rhs: TIME_SERIES_TYPE):
"""Compare two time series (when COMPARE mode is set)"""
7.3 Real-World Examples
Example 1: Simple Replay
# From hgraph_unit_tests/nodes/test_replay.py
from hgraph import graph, TS, print_, replay, GlobalState, evaluate_graph, GraphConfiguration
from hgraph._impl._operators._record_replay_in_memory import SimpleArrayReplaySource, set_replay_values
def test_replay_simple():
"""Simple replay from in-memory source"""
with GlobalState():
# Configure replay source
set_replay_values("test", SimpleArrayReplaySource(["1", "2", "3"]))
@graph
def main():
values = replay("test", TS[str])
print_(values)
evaluate_graph(main, GraphConfiguration())
Example 2: Record and Replay Combined
# From hgraph_unit_tests/nodes/test_recorder.py
from hgraph import graph, TS, MIN_ST, MIN_TD, print_, replay, record, GlobalState, evaluate_graph
from hgraph._impl._operators._record_replay_in_memory import SimpleArrayReplaySource, set_replay_values, get_recorded_value
def test_recorder():
"""Record values while replaying"""
with GlobalState():
set_replay_values("test", SimpleArrayReplaySource(["1", "2", "3"]))
@graph
def main():
value_ts = replay("test", TS[str])
record(value_ts) # Record the replayed values
print_(value_ts)
evaluate_graph(main, GraphConfiguration())
# Verify recorded values
values = get_recorded_value()
assert values == [(MIN_ST, "1"), (MIN_ST + MIN_TD, "2"), (MIN_ST + 2 * MIN_TD, "3")]
Example 3: Generator-Based Replay Implementation
# From hgraph/_impl/_operators/_record_replay_in_memory.py
class SimpleArrayReplaySource(ReplaySource):
"""Simple source that yields values at regular intervals"""
def __init__(self, values: list[Any], start_time: datetime = MIN_ST):
self.values = values
self.start_time = start_time
def __iter__(self) -> Iterable[tuple[datetime, Any]]:
next_engine_time = self.start_time
for value in self.values:
if value is not None:
yield next_engine_time, value
next_engine_time += MIN_TD
@generator(overloads=replay, requires=record_replay_model_restriction(IN_MEMORY, True))
def replay_from_memory(key: str, tp: type[TIME_SERIES_TYPE] = AUTO_RESOLVE,
is_operator: bool = False, recordable_id: str = None,
_traits: Traits = None, _clock: EvaluationClock = None) -> TIME_SERIES_TYPE:
"""Replay values from global state"""
recordable_id = get_fq_recordable_id(_traits, recordable_id)
recordable_id = f":memory:{recordable_id}"
source = GlobalState.instance().get(f"{recordable_id}.{key}", None)
if source is None:
raise ValueError(f"Replay source with label '{key}' does not exist")
tm = _clock.evaluation_time
for ts, v in source:
if ts < tm:
continue
if v is not None:
yield ts, v
Example 4: Record to Memory Implementation
# From hgraph/_impl/_operators/_record_replay_in_memory.py
@sink_node(overloads=record, requires=record_replay_model_restriction(IN_MEMORY, True))
def record_to_memory(ts: TIME_SERIES_TYPE, key: str = "out", is_operator: bool = False,
recordable_id: str = None, _api: EvaluationEngineApi = None,
_state: STATE = None, _traits: Traits = None):
"""Record values to global state"""
_state.record_value.append((_api.evaluation_clock.evaluation_time, ts.delta_value))
@record_to_memory.start
def record_to_memory_start(key: str, is_operator: bool, recordable_id: str,
_state: STATE, _traits: Traits):
recordable_id = get_fq_recordable_id(_traits, recordable_id)
recordable_id = f":memory:{recordable_id}.{key}"
_state.recordable_id = recordable_id
_state.record_value = []
@record_to_memory.stop
def record_to_memory_stop(_state: STATE, _api: EvaluationEngineApi):
global_state = GlobalState.instance()
global_state[_state.recordable_id] = _state.record_value
7.4 DataFrame Storage Backend
# From hgraph/adaptors/data_frame/_data_frame_record_replay.py
from abc import ABC, abstractmethod
import polars as pl
class DataFrameStorage(ABC):
"""Abstract storage for recording/replaying dataframes"""
@abstractmethod
def read_frame(self, path: str, start_time: datetime = None,
end_time: datetime = None, as_of: datetime = None) -> pl.DataFrame:
"""Read dataframe with optional time range filtering"""
@abstractmethod
def write_frame(self, path: str, df: pl.DataFrame,
mode: WriteMode = WriteMode.OVERWRITE, as_of: datetime = None) -> pl.DataFrame:
"""Write dataframe with optional mode (EXTEND/OVERWRITE/MERGE)"""
class MemoryDataFrameStorage(BaseDataFrameStorage):
"""In-memory dataframe storage for testing"""
def __init__(self):
super().__init__()
self._frames = {}
def _write(self, path: Path, df: pl.DataFrame):
self._frames[str(path)] = df
def _read(self, path: Path) -> pl.DataFrame:
return self._frames.get(str(path), None)
class FileBasedDataFrameStorage(BaseDataFrameStorage):
"""File-system-based dataframe storage using parquet"""
def __init__(self, path: Path):
super().__init__()
self._path = path
path.mkdir(parents=True, exist_ok=True)
def _read(self, path: Path) -> pl.DataFrame:
return pl.read_parquet(self._path / f"{path}.parquet")
def _write(self, path, df):
df.write_parquet(self._path / f"{path}.parquet")
DataFrame Record/Replay Tests:
# From hgraph_unit_tests/adaptors/data_frame/test_data_frame_record_replay.py
from hgraph import GlobalState, set_record_replay_model, record, TS, set_as_of, MIN_ST, MIN_TD, replay, TSD
from hgraph.adaptors.data_frame import DATA_FRAME_RECORD_REPLAY, MemoryDataFrameStorage, set_data_frame_overrides
from hgraph.test import eval_node
def test_data_frame_record():
"""Record to dataframe storage"""
with GlobalState() as gs, MemoryDataFrameStorage() as ds:
set_record_replay_model(DATA_FRAME_RECORD_REPLAY)
set_as_of(MIN_ST + MIN_TD * 30)
eval_node(record[TS[int]], ts=[1, 2, 3], key="ts", recordable_id="test")
assert len(ds._frames) == 1
df = next(iter(ds._frames.values()))
assert len(df) == 3
assert df["value"][0] == 1
def test_data_frame_record_replay():
"""Record and replay from dataframe"""
with GlobalState() as gs, MemoryDataFrameStorage() as ds:
set_record_replay_model(DATA_FRAME_RECORD_REPLAY)
set_as_of(MIN_ST + MIN_TD * 30)
eval_node(record[TS[int]], ts=[1, 2, 3], key="ts", recordable_id="test")
assert eval_node(replay[TS[int]], key="ts", recordable_id="test") == [1, 2, 3]
def test_data_frame_record_replay_tsd():
"""Record TSD with partitioning"""
with GlobalState() as gs, MemoryDataFrameStorage() as ds:
set_record_replay_model(DATA_FRAME_RECORD_REPLAY)
set_as_of(MIN_ST + MIN_TD * 30)
set_table_schema_date_key("date")
set_data_frame_overrides(key="ts", recordable_id="test", track_as_of=False,
track_removes=False, partition_keys=["id"])
eval_node(record[TSD[str, TS[int]]], ts=[{"a": 1}, {"b": 2}, {"a": 3}],
key="ts", recordable_id="test")
df = next(iter(ds._frames.values()))
assert [k for k in df.schema] == ["date", "id", "value"]
7.5 Storage Configuration
from hgraph import set_record_replay_model, IN_MEMORY
from hgraph.adaptors.data_frame import DATA_FRAME_RECORD_REPLAY, set_data_frame_record_path
# Use in-memory storage
set_record_replay_model(IN_MEMORY)
# Use DataFrame storage
set_record_replay_model(DATA_FRAME_RECORD_REPLAY)
set_data_frame_record_path(Path("/path/to/recordings"))
# Configure DataFrame recording options
set_data_frame_overrides(
key="my_key",
recordable_id="my_id",
track_as_of=True, # Track as_of column
track_removes=True, # Track remove operations
partition_keys=["id"] # Partition by id column
)
8. Source Node Comparison
Source Type |
Scheduling |
External Events |
Deterministic |
Mode |
|---|---|---|---|---|
pull_source_node |
Self-scheduled |
No |
Yes |
Both |
push_source_node |
Event-driven |
Yes |
No |
Real-time |
generator |
Yield-based |
No |
Yes |
Both |
push_queue |
Event-driven |
Yes |
No |
Real-time |
const_fn |
Once |
No |
Yes |
Both |
9. Execution Modes
9.1 Simulation Mode
graph LR
T1[t=0] --> |"instant"| T2[t=100]
T2 --> |"instant"| T3[t=500]
T3 --> |"instant"| T4[t=1000]
Characteristics:
Time jumps to next scheduled event
Push sources not allowed
Deterministic execution
Reproducible results
9.2 Real-Time Mode
graph LR
T1[t=0] --> |"sleep 100ms"| T2[t=100]
T2 --> |"sleep 400ms"| T3[t=500]
T3 --> |"sleep 500ms"| T4[t=1000]
Characteristics:
Time tracks wall clock
Push sources allowed
Non-deterministic (external events)
May process late events
9.3 Mode Selection
from hgraph import run_graph, EvaluationMode, GraphConfiguration
# Simulation (default)
run_graph(my_graph, GraphConfiguration(
start_time=start_time,
end_time=end_time,
run_mode=EvaluationMode.SIMULATION
))
# Real-time
run_graph(my_graph, GraphConfiguration(
start_time=start_time,
end_time=timedelta(minutes=5), # Duration for real-time
run_mode=EvaluationMode.REAL_TIME
))
10. const (Constant Source)
10.1 Operator Signature
# From hgraph/_operators/_time_series_conversion.py
@operator
def const(
value: SCALAR,
tp: type[OUT] = TS[SCALAR],
delay: timedelta = timedelta(),
) -> DEFAULT[OUT]:
"""
Produces a single tick at the start of the graph evaluation.
:param value: The value to emit
:param tp: Output type, defaults to TS[SCALAR]
:param delay: Amount of time to delay the value
:return: A single tick of the value supplied
"""
10.2 Usage Examples
# From hgraph_unit_tests/_operators/test_const.py
from hgraph import const, MIN_TD
from hgraph.test import eval_node
def test_const():
assert eval_node(const, 1) == [1]
def test_delayed_const():
assert eval_node(const, 1, delay=MIN_TD * 2) == [None, None, 1]
# Create different types
forty_two = const(42) # TS[int]
greeting = const("hello") # TS[str]
config = const(frozendict({"a": 1}), TSD[str, TS[int]]) # TSD
10.3 Behavior
Outputs value once at start (or after delay)
Never modified again
Valid for entire graph lifetime
11. nothing (Null Source)
11.1 Operator Signature
# From hgraph/_operators/_graph_operators.py
@operator
def nothing(tp: type[OUT] = AUTO_RESOLVE) -> DEFAULT[OUT]:
"""
Produces no ticks ever. Can be used in two ways:
```python
nothing[TS[int]]()
```
or
```python
nothing(TS[int])
```
This is equivalent to None for time-series inputs.
"""
11.2 Usage
from hgraph import nothing
# Create unbound/null time-series
empty_int = nothing(TS[int])
# Or using type indexing
empty_str = nothing[TS[str]]()
11.3 Behavior
Creates time-series that is never valid
Useful as placeholder or for optional paths
Equivalent to
Nonefor unconnected inputs
12. Reference Locations
Concept |
Python Location |
|---|---|
@pull_source_node |
|
@push_source_node |
|
@generator |
|
@push_queue |
|
@const_fn |
|
Record/Replay API |
|
In-Memory Storage |
|
DataFrame Storage |
|
const |
|
nothing |
|
13. Summary
HGraph’s data source system provides:
Pull sources (
@pull_source_node,@generator) for scheduled, deterministic data generationPush sources (
@push_source_node,@push_queue) for asynchronous external event handlingGenerators for convenient Python generator integration with rich timing control
Push queues with batching and eliding for high-throughput external data injection
const_fn for dual-use scalar functions in and outside graphs
Record/Replay for testing, debugging, and back-testing with multiple storage backends
Mode selection for simulation vs real-time execution
The separation of pull and push sources ensures that simulation mode remains deterministic while real-time mode can handle external events. The record/replay system enables reproducible testing and back-testing of live systems.