flowno.core.flow_hdl
FlowHDL: Hardware Description Language-inspired context for defining dataflow graphs.
This module provides the FlowHDL context manager which allows users to: - Define nodes and their connections in any order - Forward reference nodes before they are created (for cyclic dependencies) - Automatically finalize node connections when exiting the context
Example
>>> from flowno import FlowHDL, node
>>>
>>> @node
... async def Add(x, y):
... return x + y
...
>>> @node
... async def Source(value):
... return value
...
>>> with FlowHDL() as f:
... f.output = Add(f.input1, f.input2) # Reference nodes before definition
... f.input1 = Source(1) # Define nodes in any order
... f.input2 = Source(2)
...
>>> f.run_until_complete()
>>> f.output.get_data()
(3,)
- class flowno.core.flow_hdl.FlowHDL[source]
Context manager for constructing and executing dataflow graphs.
FlowHDLextendsFlowHDLViewwith the ability to run the resultingFlow. Within thewithblock users may assign draft nodes to attributes and reference not-yet-defined nodes freely. When the context exits, all placeholders are resolved and the underlyingFlowis finalized.Example
>>> with FlowHDL() as f: ... f.result = Add(f.a, f.b) ... f.a = Source(1) ... f.b = Source(2) >>> f.run_until_complete()
User defined attribute names should not start with an underscore.
- Canonical:
- KEYWORDS: ClassVar[list[str]] = ['KEYWORDS', 'run_until_complete', 'create_task', 'register_child_result']
Keywords that should not be treated as nodes in the graph.
- __getattribute__(key)[source]
Override the default attribute getter to return a placeholder for undefined attributes.
Treats attributes starting with an underscore or in the KEYWORDS list as normal attributes.
- create_task(raw_task: Coroutine[Command, Any, Any]) TaskHandle[Command][source]
Create a new task handle for the given raw task and enqueue the task in the event loop’s task queue.
- Parameters:
raw_task – The raw task to create a handle for.
- Returns:
A TaskHandle object representing the created task.
- run_until_complete(stop_at_node_generation: dict[DraftNode[Unpack[tuple[Any, ...]], tuple[Any, ...]] | FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = True, _debug_max_wait_time: float | None = None, context_factory: Callable[[FinalizedNode], Any] | None = None) None[source]
Run the flow until all nodes have completed processing.
- Parameters:
stop_at_node_generation – Optional generation number or mapping of nodes to generation numbers to stop execution at
terminate_on_node_error – Whether to terminate the entire flow if any node raises an exception
_debug_max_wait_time – Maximum time to wait for nodes to complete (for debugging only)