flowno.core.flow.flow
Flow execution and graph resolution module for Flowno.
This module contains the Flow class, which is the core execution engine for dataflow graphs. It manages node scheduling, dependency resolution, cycle breaking, and concurrent execution.
- Key components:
Flow: The main dataflow graph execution engine
FlowEventLoop: A custom event loop for handling Flow-specific commands
NodeTaskStatus: State tracking for node execution
- class flowno.core.flow.flow.Flow(is_finalized: bool = True)[source]
Dataflow graph execution engine.
The Flow class manages the execution of a dataflow graph, handling dependency resolution, node scheduling, and cycle breaking. It uses a custom event loop to execute nodes concurrently while respecting data dependencies.
- Key features:
Automatic dependency-based scheduling
Cycle detection and resolution
Support for streaming data (run levels)
Concurrency management
- unvisited
List of nodes that have not yet been visited during execution
- visited
Set of nodes that have been visited
- node_tasks
Dictionary mapping nodes to their tasks and status
- Type:
dict[flowno.core.node_base.FinalizedNode[Unpack[tuple[object, …]], tuple[object, …]], flowno.core.flow.flow.NodeTaskAndStatus]
- resolution_queue
Queue of nodes waiting to be resolved
- Type:
flowno.core.event_loop.queues.AsyncSetQueue[flowno.core.node_base.FinalizedNode[Unpack[tuple[object, …]], tuple[object, …]]]
- _condensed_tree(head: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]) SuperNode[source]
Build a condensed graph of strongly connected components (SCCs) from stale connections.
This method implements Tarjan’s algorithm to find strongly connected components (cycles) in the dependency graph, but only following connections that are “stale” (where the input’s generation is <= the node’s generation).
- Parameters:
head – The starting point for building the condensed graph
- Returns:
A SuperNode representing the root of the condensed graph
- async _enqueue_node(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]])[source]
Enqueue a single node for resolution.
- Parameters:
node – The node to enqueue
- async _enqueue_output_nodes(out_node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]])[source]
Enqueue all nodes that depend on the given node.
- Parameters:
out_node – The node whose dependents should be enqueued
- _find_leaf_supernodes(root: SuperNode) list[SuperNode][source]
Identify all leaf supernodes in the condensed DAG. Leaf supernodes are those with no dependencies.
- _find_node_solution(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]) list[FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]][source]
Find the nodes that are ultimately preventing the given node from running.
- This method is key to Flowno’s cycle resolution algorithm. It:
Builds a condensed graph of strongly connected components (SCCs)
Finds the leaf SCCs in this condensed graph
For each leaf SCC, picks a node to force evaluate based on default values
- Parameters:
node – The node whose dependencies need to be resolved
- Returns:
A list of nodes that should be forced to evaluate to unblock the given node
- Raises:
MissingDefaultError – If a cycle is detected with no default values to break it
- async _handle_async_generator_node(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], returned: AsyncGenerator[tuple[object, ...], None])[source]
Handle a node that returns an async generator (streaming output).
This processes each yielded item from the generator, storing them as run level 1 data, and accumulates them for the final run level 0 result when the generator completes.
Args:
- async _handle_coroutine_node(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], returned: Awaitable[tuple[object, ...]])[source]
Handle a node that returns a coroutine (single output).
This awaits the result of the node’s coroutine and stores the result in the node’s data.
- Parameters:
node – The node to handle
returned – The coroutine returned by the node’s call
- _mark_node_as_visited(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]])[source]
Mark a node as visited during the resolution process.
- Parameters:
node – The node to mark as visited
- _node_resolve_loop(stop_at_node_generation: dict[FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], tuple[int, ...] | None] | tuple[int, ...] | None, terminate_on_node_error: bool)[source]
Main resolution loop for the flow.
This function implements the core algorithm for resolving node dependencies and executing nodes in the correct order. It:
Picks an initial node
- For each node in the resolution queue:
Finds the set of nodes that must be executed first
Marks those nodes as visited
Resumes their execution
Continues until the resolution queue is empty
- Parameters:
stop_at_node_generation – Generation limit for nodes
terminate_on_node_error – Whether to terminate on node errors
- _pick_node_to_force_evaluate(leaf_supernode: SuperNode) FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]][source]
Pick a node to force evaluate according to the cycle breaking heuristic.
- Parameters:
leaf_supernode (SuperNode) – The leaf Super-Node of the Condensed subgraph.
- Returns:
The node to force evaluate.
- Return type:
FinalizedNode[Unpack[tuple[object, …]], tuple[object, …]]
- Undefined Behavior:
If the argument is not a leaf in the condensed graph, the behavior is undefined.
- _register_node(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]])[source]
Register a node’s task with the flow.
This creates the persistent task for the node and adds it to the node_tasks dictionary.
- Parameters:
node – The node to register
- _stall_stream_consumer(consumer_node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], producer_node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], stream: Stream[object], consumer_task: Generator[object, object, object], state_key: tuple[FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], int, FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], int, int], event_loop: EventLoop) None[source]
Stall a stream consumer and potentially wake the producer.
Called when a consumer requests stream data that isn’t available yet. Updates consumer status, registers it for notification when data arrives, and enqueues the producer if it’s idle.
PRODUCER STATUS → ACTION: - WaitingForStartNextGeneration: Enqueue (producer is idle) - Stalled: Enqueue (producer blocked, needs cycle resolution) - Queued: No action (already pending) - Executing: No action (will push data when it yields) - Error: End stream with StopAsyncIteration
- async _terminate_if_reached_limit(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]])[source]
Check if a node has reached its generation limit and terminate if so.
- Parameters:
node – The node to check
- Raises:
TerminateLimitReached – If the node reached its generation limit
- add_node(node: FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]])[source]
Add a node to the flow.
- Parameters:
node – The node to add
- add_nodes(nodes: list[FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]]])[source]
Add multiple nodes to the flow.
- Parameters:
nodes – The nodes to add
- clear_defaulted_inputs(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]) None[source]
Remove defaulted input information for a node.
- Parameters:
node – The node to clear defaulted inputs for
- evaluate_node(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]) Never[source]
The persistent task that evaluates a node.
- This is the main execution function for a node. It:
Waits for the node to be ready to run
Gathers inputs and handles defaulted values
Calls the node with its inputs
Processes the result (either coroutine or async generator)
Propagates outputs to dependent nodes
Repeats
- Parameters:
node – The node to evaluate
- Returns:
Never returns; runs as a persistent coroutine
- Raises:
NotImplementedError – If the node does not return a coroutine or async generator
- is_input_defaulted(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], input_port: InputPortIndex) bool[source]
Check if a specific input port is using a default value.
- Parameters:
node – The node to check
input_port – The input port index to check
- Returns:
True if the input port is using a default value, False otherwise
- run_until_complete(stop_at_node_generation: dict[FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]] | DraftNode[Unpack[tuple[Any, ...]], tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = False, _debug_max_wait_time: float | None = None, context_factory: Callable[[FinalizedNode], Any] | None = None)[source]
Execute the flow until completion or until a termination condition is met.
This is the main entry point for running a flow. It starts the resolution process and runs until all nodes have completed or a termination condition (like reaching a generation limit or an error) is met.
- Parameters:
stop_at_node_generation – Generation limit for nodes, either as a global limit or as a dict mapping nodes to their individual limits
terminate_on_node_error – Whether to terminate the flow if a node raises an exception
_debug_max_wait_time – Maximum time in seconds to wait for I/O operations (useful for debugging)
- Raises:
Exception – Any exception raised by nodes and not caught
TerminateLimitReached – When a node reaches its generation limit
- set_defaulted_inputs(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], defaulted_inputs: list[InputPortIndex]) None[source]
Mark specific inputs of a node as using default values.
When a node uses default values for inputs that are part of a cycle, this method records that information and increments the stitch level to prevent infinite recursion.
- Parameters:
node – The node with defaulted inputs
defaulted_inputs – List of input port indices using default values
- set_node_status(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], status: Queued | Executing | WaitingForStartNextGeneration | Error | Stalled) None[source]
Update the status of a node and notify instrumentation.
- Parameters:
node – The node whose status is being updated
status – The new status to set
- class flowno.core.flow.flow.FlowEventLoop(flow: Flow)[source]
- _handle_command(current_task_packet: tuple[Coroutine[Command, Any, Any], Any, Exception | None], command: Command) bool[source]
Handle the command yielded by the current task.
Returns True if the command was successfully handled.
- exception flowno.core.flow.flow.NodeExecutionError(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]])[source]
Exception raised when a node execution fails.
- class flowno.core.flow.flow.NodeTaskAndStatus(task: Coroutine[Command, object, Never], status: Queued | Executing | WaitingForStartNextGeneration | Error | Stalled)[source]
Container for a node’s task and its current status.
- _asdict()
Return a new dict which maps field names to their values.
- classmethod _make(iterable)
Make a new NodeTaskAndStatus object from a sequence or iterable
- _replace(**kwds)
Return a new NodeTaskAndStatus object replacing specified fields with new values
- class flowno.core.flow.flow.NodeTaskStatus[source]
Represents the possible states of a node’s task within the flow execution.
- States:
Queued: The node is queued to be executed (in the event loop task queue).
Executing: The node task is currently executing (task.send/throw is being called).
WaitingForStartNextGeneration: The node is waiting to start its next generation.
Error: The node encountered an error during execution.
Stalled: The node is blocked waiting on input data.
- class Stalled(stalling_input: FinalizedInputPortRef[object])[source]
Node is stalled waiting for input data.
- class flowno.core.flow.flow.ResumeNodesCommand(nodes: list[FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]])[source]
Command to resume execution of one or more nodes.
- class flowno.core.flow.flow.StreamConsumerState(last_consumed_generation: tuple[int, ...] | None = None, cancelled_after_consuming_generation: tuple[int, ...] | None = None)[source]
Tracks the consumption state of a stream consumer.
This state is used to determine when to return data, stall waiting for the producer, or raise StopAsyncIteration (stream complete).
- exception flowno.core.flow.flow.TerminateLimitReached[source]
Exception raised when a node reaches its generation limit.
- class flowno.core.flow.flow.TerminateReachedLimitCommand[source]
Command to terminate the flow because a node reached its generation limit.
- class flowno.core.flow.flow.TerminateWithExceptionCommand(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], exception: Exception)[source]
Command to terminate the flow with an exception.
- class flowno.core.flow.flow.WaitForStartNextGenerationCommand(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], run_level: int = 0)[source]
Command to wait for a node to start its next generation.
Warning
If this command is handled before the resolution queue is pushed with new nodes, and the resolution queue is empty, the resolution queue will be closed, causing the flow to terminate.
- flowno.core.flow.flow._resume_nodes(nodes: list[FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]]) Generator[ResumeNodesCommand, None, None][source]
Resume the concurrent node tasks. Does not guarantee that the nodes will resume if already running.
- flowno.core.flow.flow._terminate_reached_limit() Generator[TerminateReachedLimitCommand, None, None][source]
Coroutine that yields a command to terminate when a generation limit is reached.
- flowno.core.flow.flow._terminate_with_exception(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], exception: Exception) Generator[TerminateWithExceptionCommand, None, None][source]
Coroutine that yields a command to terminate with an exception.
- flowno.core.flow.flow._wait_for_start_next_generation(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], run_level: int = 0) Generator[WaitForStartNextGenerationCommand, None, None][source]
Coroutine that yields a command to wait for a node’s next generation.
- flowno.core.flow.flow.current_context() Any[source]
Get the context for the currently executing node. Calls the context factory provided to run_until_complete().
- Returns:
The NodeContext for the current node.
- Raises:
RuntimeError – If called outside a flow, outside a node, or if no context factory was provided.