flowno.core.flow_hdl

FlowHDL: Hardware Description Language-inspired context for defining dataflow graphs.

This module provides the FlowHDL context manager which allows users to: - Define nodes and their connections in any order - Forward reference nodes before they are created (for cyclic dependencies) - Automatically finalize node connections when exiting the context

Example

>>> from flowno import FlowHDL, node
>>>
>>> @node
... async def Add(x, y):
...     return x + y
...
>>> @node
... async def Source(value):
...     return value
...
>>> with FlowHDL() as f:
...     f.output = Add(f.input1, f.input2)  # Reference nodes before definition
...     f.input1 = Source(1)                # Define nodes in any order
...     f.input2 = Source(2)
...
>>> f.run_until_complete()
>>> f.output.get_data()
(3,)
class flowno.core.flow_hdl.FlowHDL[source]

Context manager for constructing and executing dataflow graphs.

FlowHDL extends FlowHDLView with the ability to run the resulting Flow. Within the with block users may assign draft nodes to attributes and reference not-yet-defined nodes freely. When the context exits, all placeholders are resolved and the underlying Flow is finalized.

Example

>>> with FlowHDL() as f:
...     f.result = Add(f.a, f.b)
...     f.a = Source(1)
...     f.b = Source(2)
>>> f.run_until_complete()

User defined attribute names should not start with an underscore.

Canonical:

flowno.core.flow_hdl.FlowHDL

KEYWORDS: ClassVar[list[str]] = ['KEYWORDS', 'run_until_complete', 'create_task', 'register_child_result']

Keywords that should not be treated as nodes in the graph.

__getattribute__(key)[source]

Override the default attribute getter to return a placeholder for undefined attributes.

Treats attributes starting with an underscore or in the KEYWORDS list as normal attributes.

create_task(raw_task: Coroutine[Command, Any, Any]) TaskHandle[Command][source]

Create a new task handle for the given raw task and enqueue the task in the event loop’s task queue.

Parameters:

raw_task – The raw task to create a handle for.

Returns:

A TaskHandle object representing the created task.

run_until_complete(stop_at_node_generation: dict[DraftNode[Unpack[tuple[Any, ...]], tuple[Any, ...]] | FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = True, _debug_max_wait_time: float | None = None, context_factory: Callable[[FinalizedNode], Any] | None = None) None[source]

Run the flow until all nodes have completed processing.

Parameters:
  • stop_at_node_generation – Optional generation number or mapping of nodes to generation numbers to stop execution at

  • terminate_on_node_error – Whether to terminate the entire flow if any node raises an exception

  • _debug_max_wait_time – Maximum time to wait for nodes to complete (for debugging only)