Source code for hgraph._runtime._evaluation_clock

from abc import abstractmethod
from datetime import datetime, timedelta

__all__ = ("EvaluationClock", "EngineEvaluationClock")


[docs] class EvaluationClock: """ The evaluation clock provides a view on time in the currently evaluating graph. Time in the graph is dependent on the mode of execution. In simulation mode, the clock presented is simulated to allow for processing historical data in rapid time. In realtime mode, the clock represent the system clock (time is always presented in UTC). By using the clock abstraction to query time-related information it is possible to ensure that logic can be written independently of how the system chooses to move through time. """ @property @abstractmethod def evaluation_time(self) -> datetime: """ The time of the source event initiating this evaluation cycle. This time remains the same for each node processed until the graph completes one complete step through the nodes in the graph. """ @property @abstractmethod def now(self) -> datetime: """ This represents the current time, in a real time engine, this will represent the system clock. In simulation mode this will represent the evaluation_time + lag. """ @property @abstractmethod def cycle_time(self) -> timedelta: """ The amount of time spent in computation since the beginning of the evaluation of the graph till the point where this property is called. """ @property @abstractmethod def next_cycle_evaluation_time(self) -> datetime: """ The next smallest evaluation time that can be scheduled. This is a convenience method and is equivalent to evaluation_time + MIN_TD """
class EngineEvaluationClock(EvaluationClock): """ Extends the evaluation clock to provide additional functionality for the engine. The engine needs to be able to interact with the clock. The user api is limited to the EvaluationClock interface. """ @property @abstractmethod def evaluation_time(self) -> datetime: """ The time of the source event initiating this evaluation cycle. This time remains the same for each node processed until the graph completes one complete step through the nodes in the graph. """ @evaluation_time.setter @abstractmethod def evaluation_time(self, value: datetime): """ Set the evaluation time. This should only be done by the graph engine. """ @property @abstractmethod def next_scheduled_evaluation_time(self) -> datetime: """ The next smallest evaluation time that a node is currently scheduled to be evaluated. """ @abstractmethod def update_next_scheduled_evaluation_time(self, next_time: datetime): """ This will ensure the next scheduled time is updated to reflect the earliest time that a node is scheduled to evaluate. """ @abstractmethod def advance_to_next_scheduled_time(self): """ Advance the clock to the next scheduled time. For a simulation clock, this will set the evaluation time to the next scheduled time and return. For a real time clock, this will block until the next scheduled time or the engine is notified by a push node that will cause the engine to set the evaluation time ot the lesser of now or the next scheduled time. """ @abstractmethod def mark_push_node_requires_scheduling(self): """ Push nodes are not scheduled using time as is the case for pull source nodes. For push nodes, when an event arrives, the engine will schedule the node to be evaluated at the next soonest time, effectively interleaving push events with what ever scheduled events are being evaluated at the time. This method is only ever called in a real-time graph as simulation graphs are not allowed to have push nodes. """ @property @abstractmethod def push_node_requires_scheduling(self) -> bool: """ True if there are any push nodes have requested to be scheduled, False otherwise. """ @abstractmethod def reset_push_node_requires_scheduling(self): """ Reset the push_has_pending_values property. """ class EngineEvaluationClockDelegate(EngineEvaluationClock): """Support adding new logic to the engine evaluation clock.""" def __init__(self, engine_evaluation_clock: EngineEvaluationClock): self._engine_evaluation_clock = engine_evaluation_clock @property def evaluation_time(self) -> datetime: return self._engine_evaluation_clock.evaluation_time @evaluation_time.setter def evaluation_time(self, value: datetime): self._engine_evaluation_clock.evaluation_time = value @property def next_scheduled_evaluation_time(self) -> datetime: return self._engine_evaluation_clock.next_scheduled_evaluation_time def update_next_scheduled_evaluation_time(self, next_time: datetime): self._engine_evaluation_clock.update_next_scheduled_evaluation_time(next_time) def advance_to_next_scheduled_time(self): self._engine_evaluation_clock.advance_to_next_scheduled_time() def mark_push_node_requires_scheduling(self): self._engine_evaluation_clock.mark_push_node_requires_scheduling() @property def push_node_requires_scheduling(self) -> bool: return self._engine_evaluation_clock.push_node_requires_scheduling def reset_push_node_requires_scheduling(self): self._engine_evaluation_clock.reset_push_node_requires_scheduling() @property def now(self) -> datetime: return self._engine_evaluation_clock.now @property def cycle_time(self) -> timedelta: return self._engine_evaluation_clock.cycle_time @property def next_cycle_evaluation_time(self) -> datetime: return self._engine_evaluation_clock.next_cycle_evaluation_time