Source code for flowno.core.flow_hdl

"""
FlowHDL: Hardware Description Language-inspired context for defining dataflow graphs.

This module provides the FlowHDL context manager which allows users to:
- Define nodes and their connections in any order
- Forward reference nodes before they are created (for cyclic dependencies)
- Automatically finalize node connections when exiting the context

Example:
    >>> from flowno import FlowHDL, node
    >>>
    >>> @node
    ... async def Add(x, y):
    ...     return x + y
    ...
    >>> @node
    ... async def Source(value):
    ...     return value
    ...
    >>> with FlowHDL() as f:
    ...     f.output = Add(f.input1, f.input2)  # Reference nodes before definition
    ...     f.input1 = Source(1)                # Define nodes in any order
    ...     f.input2 = Source(2)
    ...
    >>> f.run_until_complete()
    >>> f.output.get_data()
    (3,)
"""

import inspect
import logging
from types import TracebackType
from typing import Any, Callable, ClassVar, cast

from flowno.core.event_loop.commands import Command
from flowno.core.event_loop.types import RawTask
from flowno.core.event_loop.tasks import TaskHandle
from flowno.core.flow.flow import Flow
from flowno.core.flow_hdl_view import FlowHDLView
from flowno.core.node_base import (
    DraftInputPortRef,
    DraftNode,
    FinalizedNode,
    NodeContextFactoryProtocol,
    NodePlaceholder,
    OutputPortRefPlaceholder,
)
from flowno.core.types import Generation
from typing_extensions import Self, TypeVarTuple, Unpack, override

_Ts = TypeVarTuple("_Ts")


logger = logging.getLogger(__name__)


[docs] class FlowHDL(FlowHDLView): """Context manager for constructing and executing dataflow graphs. ``FlowHDL`` extends :class:`FlowHDLView` with the ability to run the resulting :class:`~flowno.core.flow.flow.Flow`. Within the ``with`` block users may assign draft nodes to attributes and reference not-yet-defined nodes freely. When the context exits, all placeholders are resolved and the underlying :class:`Flow` is finalized. Example ------- >>> with FlowHDL() as f: ... f.result = Add(f.a, f.b) ... f.a = Source(1) ... f.b = Source(2) >>> f.run_until_complete() User defined attribute names should not start with an underscore. :canonical: :py:class:`flowno.core.flow_hdl.FlowHDL` """ KEYWORDS: ClassVar[list[str]] = [ "KEYWORDS", "run_until_complete", "create_task", "register_child_result", ] """Keywords that should not be treated as nodes in the graph.""" def __init__(self) -> None: def _on_register_finalized_node(node: FinalizedNode) -> None: """Callback to handle finalized nodes.""" self._flow.add_node(node) super().__init__(on_register_finalized_node=_on_register_finalized_node) self._flow: Flow = Flow(is_finalized=False)
[docs] @override def __getattribute__(self, key): return super().__getattribute__(key)
@override def __getattr__(self, key): return super().__getattr__(key)
[docs] def run_until_complete( self, stop_at_node_generation: ( dict[ DraftNode[Unpack[tuple[Any, ...]], tuple[Any, ...]] | FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]], Generation, ] | Generation ) = (), terminate_on_node_error: bool = True, _debug_max_wait_time: float | None = None, context_factory: Callable[["FinalizedNode"], Any] | None = None, ) -> None: """Run the flow until all nodes have completed processing. Args: stop_at_node_generation: Optional generation number or mapping of nodes to generation numbers to stop execution at terminate_on_node_error: Whether to terminate the entire flow if any node raises an exception _debug_max_wait_time: Maximum time to wait for nodes to complete (for debugging only) """ self._flow.run_until_complete( stop_at_node_generation=stop_at_node_generation, terminate_on_node_error=terminate_on_node_error, _debug_max_wait_time=_debug_max_wait_time, context_factory=context_factory, )
[docs] def create_task( self, raw_task: RawTask[Command, Any, Any], ) -> "TaskHandle[Command]": """ Create a new task handle for the given raw task and enqueue the task in the event loop's task queue. Args: raw_task: The raw task to create a handle for. Returns: A TaskHandle object representing the created task. """ return self._flow.event_loop.create_task(raw_task)