flowno

Flowno: A Python DSL for building dataflow programs.

This module provides tools for creating concurrent, cyclic, and streaming dataflow programs.

Key features:
  • Node-based design with the @node decorator

  • Support for cyclic dependencies and streaming data

  • Built-in concurrency with a custom event loop

  • Type-checked node connections

Configure logging with environment variables:
  • FLOWNO_LOG_LEVEL: Set logging level (default: ERROR)

  • FLOWNO_LOG_TAG_FILTER: Filter logs by tags (default: ALL)

  • FLOWNO_LOG_FILE: Write logs to file when set (e.g., “flowno.log”)

  • FLOWNO_LOG_CONSOLE: Enable console logging (default: true, set to “false” to disable)

  • FLOWNO_LOG_PROPAGATE: Propagate logs to parent loggers (default: true, set to “false” to disable)

class flowno.AsyncQueue(maxsize: int | None = None)[source]

An asynchronous queue for the Flowno event loop.

This queue allows tasks to exchange data safely, with proper synchronization handled by the event loop. When used as an async iterator, it yields items until the queue is closed.

Parameters:

maxsize – The maximum number of items allowed in the queue. When the queue reaches this size, put() operations will block until items are removed. If None, the queue size is unbounded.

async close() None[source]

Close the queue, preventing further put operations.

After closing:
  • put() will raise QueueClosedError

  • get() will succeed until the queue is empty, then raise QueueClosedError

  • AsyncIterator interface will stop iteration when the queue is empty

close_nowait(event_loop: EventLoop) None[source]

Synchronously close the queue.

This method is designed to be called from within a command handler, where async operations are not possible. It handles waiter notification synchronously through the event loop.

Parameters:

event_loop – The event loop to use for notifying waiters

async get() _T[source]

Get an item from the queue.

If the queue is empty, this will wait until an item is put into the queue.

Returns:

The next item from the queue

Raises:

QueueClosedError – If the queue is closed and empty

is_closed() bool[source]

Check if the queue is closed.

Returns:

True if the queue is closed, False otherwise

async peek() _T[source]

Peek at the next item without removing it from the queue.

If the queue is empty, this will wait until an item is put into the queue.

Returns:

The next item from the queue (without removing it)

Raises:

QueueClosedError – If the queue is closed and empty

async put(item: _T) None[source]

Put an item into the queue.

If the queue is full and has a maxsize, this will wait until space is available.

Parameters:

item – The item to put into the queue

Raises:

QueueClosedError – If the queue is closed

until_empty() AsyncIterator[_T][source]

Get an async iterator that consumes all items until the queue is empty.

This iterator will close the queue automatically when all items are consumed, unless specified otherwise.

Returns:

An async iterator that yields items until the queue is empty

class flowno.Condition(lock: Lock | None = None)[source]

A condition variable for coordinating tasks based on arbitrary conditions.

A Condition is always associated with a Lock. It provides wait() and notify() operations that allow tasks to wait for a condition to become true and to signal when the condition changes.

Typical usage pattern:

lock = Lock() condition = Condition(lock)

# Waiter: async with condition:

while not some_condition:

await condition.wait() # Atomically releases lock and waits

# Lock is reacquired here

# Notifier: async with condition:

# Change the condition await condition.notify() # or notify_all()

Note: Condition also supports async with syntax, which acquires/releases the associated lock automatically.

property lock: Lock

Get the associated lock.

async notify(n: int = 1) None[source]

Wake up one or more tasks waiting on this condition.

The task must hold the lock before calling notify(). Woken tasks are moved to the lock’s wait queue and will reacquire the lock in FIFO order.

Parameters:

n – Number of tasks to wake (default 1). Use notify_all() to wake all.

async notify_all() None[source]

Wake up all tasks waiting on this condition.

The task must hold the lock before calling notify_all(). All woken tasks are moved to the lock’s wait queue and will reacquire the lock in FIFO order.

notify_all_nowait(event_loop: EventLoop) int[source]

Synchronously notify all waiters on this condition.

This method is designed to be called from within a command handler, where async operations are not possible. It handles waiter notification synchronously through the event loop.

Unlike the normal notify_all path, this does NOT require holding the lock.

Parameters:

event_loop – The event loop to use for notifying waiters

Returns:

The number of waiters that were notified

notify_nowait(event_loop: EventLoop) bool[source]

Synchronously notify one waiter on this condition.

Parameters:

event_loop – The event loop to use for notifying waiters

Returns:

True if a waiter was notified, False otherwise

async wait() None[source]

Wait for the condition to be notified.

This method atomically releases the associated lock and blocks the task until another task calls notify() or notify_all(). When the task wakes up, it automatically reacquires the lock before returning.

The task must hold the lock before calling wait().

Typical usage is in a while loop checking the condition:
async with lock:
while not condition_is_met():

await condition.wait()

class flowno.DraftGroupNode(*args: Unpack[tuple[Any, ...]])[source]

Minimal draft group node used for experimenting with template groups.

class flowno.DraftNode(*args: Unpack[tuple[object, ...]])[source]

Abstract Base class for all connectable draft nodes in the flow graph.

DraftNode subclass instance represents a node that can be connected to other nodes to form a computational graph. It only handles input/output connections. The node must be wrapped in a FinalizedNode to be used in a running flow.

Warning

Do not use this class directly, use the @node decorator instead

Examples

>>> from flowno import node, FlowHDL
>>> @node
... async def add(a: int, b: int) -> int:
...     return a + b
>>> with FlowHDL() as f:
...     f.result = add(1, f.result)
get_input_nodes() list[DraftNode[Unpack[tuple[object, ...]], tuple[object, ...]]][source]

Get all nodes connected to this node’s inputs.

get_output_nodes() list[DraftNode[Unpack[tuple[object, ...]], tuple[object, ...]]][source]

Get all nodes connected to this node’s outputs.

class flowno.Event[source]

A one-shot synchronization primitive for waking multiple waiting tasks.

An Event starts in the “not set” state. Tasks can wait() on the event, blocking until another task calls set(). Once set, the event remains set permanently, and all current and future wait() calls return immediately.

This primitive is ideal for signaling completion of an initialization phase or broadcasting a one-time notification to multiple consumers.

Examples

>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.primitives import spawn
>>> from flowno.core.event_loop.synchronization import Event
>>>
>>> async def waiter(name: str, event: Event):
...     print(f"{name}: Waiting...")
...     await event.wait()
...     print(f"{name}: Event set, proceeding!")
...     return name
>>>
>>> async def setter(event: Event):
...     print("Setter: About to set event")
...     await event.set()
...     print("Setter: Event set")
>>>
>>> async def main():
...     event = Event()
...
...     # Start multiple waiters
...     w1 = await spawn(waiter("Waiter1", event))
...     w2 = await spawn(waiter("Waiter2", event))
...
...     # Set the event
...     s = await spawn(setter(event))
...
...     # All waiters should proceed
...     await w1.join()
...     await w2.join()
...     await s.join()
>>>
>>> event_loop = EventLoop()
>>> event_loop.run_until_complete(main(), join=True)
Waiter1: Waiting...
Waiter2: Waiting...
Setter: About to set event
Setter: Event set
Waiter1: Event set, proceeding!
Waiter2: Event set, proceeding!
is_set() bool[source]

Check if the event is set.

Returns:

True if the event has been set, False otherwise.

async set() None[source]

Set the event, waking all waiting tasks.

Once set, the event remains set permanently. All current and future wait() calls will return immediately.

async wait() None[source]

Wait for the event to be set.

If the event is already set, return immediately. Otherwise, block until another task calls set().

class flowno.EventLoop[source]

The core event loop implementation for Flowno’s asynchronous execution model.

Manages task scheduling, I/O operations, and synchronization primitives for the dataflow runtime.

cancel(raw_task: Coroutine[Command, Any, Any]) bool[source]

Cancel a task.

Parameters:

raw_task – The task to cancel.

Returns:

True if the task was successfully cancelled; False if it was already finished or errored.

create_task(raw_task: Coroutine[Command, Any, Any]) TaskHandle[Command][source]

Create a new task handle for the given raw task and enqueue the task in the event loop’s task queue.

Parameters:

raw_task – The raw task to create a handle for.

Returns:

A TaskHandle object representing the created task.

has_living_tasks() bool[source]

Return True if there are any tasks still needing processing.

run_until_complete(root_task: Coroutine[Command, Any, _ReturnT], join: Literal[False] = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None) None[source]
run_until_complete(root_task: Coroutine[Command, Any, _ReturnT], join: bool = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None) _ReturnT

Run the event loop until the given root task is complete.

This method executes the main event loop, processing tasks, handling I/O operations, and managing task synchronization until the root task completes. It can optionally wait for all spawned tasks to finish as well.

Parameters:
  • root_task (RawTask[Command, Any, _ReturnT]) – The coroutine task to execute as the root of the execution graph.

  • join (bool) – When True, returns the result value of the root task. When False, returns None regardless of the task’s result. If the task raises an exception and join=True, the exception is re-raised.

  • wait_for_spawned_tasks (bool) – When True, continue running the event loop until all tasks spawned by the root task have completed. When False, stop as soon as the root task completes.

  • _debug_max_wait_time (float | None) – Optional timeout value in seconds used for debugging. Limits how long the event loop will wait for network or sleeping operations.

Returns:

If join=True, returns the result of the root task (of type _ReturnT).

If join=False, returns None.

Return type:

_ReturnT | None

Raises:
  • RuntimeError – If the event loop exits without completing the root task when join=True.

  • Exception – Any exception raised by the root task is propagated if join=True.

class flowno.Flow(is_finalized: bool = True)[source]

Dataflow graph execution engine.

The Flow class manages the execution of a dataflow graph, handling dependency resolution, node scheduling, and cycle breaking. It uses a custom event loop to execute nodes concurrently while respecting data dependencies.

Key features:
  • Automatic dependency-based scheduling

  • Cycle detection and resolution

  • Support for streaming data (run levels)

  • Concurrency management

unvisited

List of nodes that have not yet been visited during execution

Type:

list[flowno.core.node_base.FinalizedNode[Unpack[tuple[object, …]], tuple[object, …]]]

visited

Set of nodes that have been visited

Type:

set[flowno.core.node_base.FinalizedNode[Unpack[tuple[object, …]], tuple[object, …]]]

node_tasks

Dictionary mapping nodes to their tasks and status

Type:

dict[flowno.core.node_base.FinalizedNode[Unpack[tuple[object, …]], tuple[object, …]], flowno.core.flow.flow.NodeTaskAndStatus]

resolution_queue

Queue of nodes waiting to be resolved

Type:

flowno.core.event_loop.queues.AsyncSetQueue[flowno.core.node_base.FinalizedNode[Unpack[tuple[object, …]], tuple[object, …]]]

add_node(node: FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]])[source]

Add a node to the flow.

Parameters:

node – The node to add

add_nodes(nodes: list[FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]]])[source]

Add multiple nodes to the flow.

Parameters:

nodes – The nodes to add

clear_defaulted_inputs(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]) None[source]

Remove defaulted input information for a node.

Parameters:

node – The node to clear defaulted inputs for

evaluate_node(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]) Never[source]

The persistent task that evaluates a node.

This is the main execution function for a node. It:
  1. Waits for the node to be ready to run

  2. Gathers inputs and handles defaulted values

  3. Calls the node with its inputs

  4. Processes the result (either coroutine or async generator)

  5. Propagates outputs to dependent nodes

  6. Repeats

Parameters:

node – The node to evaluate

Returns:

Never returns; runs as a persistent coroutine

Raises:

NotImplementedError – If the node does not return a coroutine or async generator

is_input_defaulted(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], input_port: InputPortIndex) bool[source]

Check if a specific input port is using a default value.

Parameters:
  • node – The node to check

  • input_port – The input port index to check

Returns:

True if the input port is using a default value, False otherwise

run_until_complete(stop_at_node_generation: dict[FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]] | DraftNode[Unpack[tuple[Any, ...]], tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = False, _debug_max_wait_time: float | None = None, context_factory: Callable[[FinalizedNode], Any] | None = None)[source]

Execute the flow until completion or until a termination condition is met.

This is the main entry point for running a flow. It starts the resolution process and runs until all nodes have completed or a termination condition (like reaching a generation limit or an error) is met.

Parameters:
  • stop_at_node_generation – Generation limit for nodes, either as a global limit or as a dict mapping nodes to their individual limits

  • terminate_on_node_error – Whether to terminate the flow if a node raises an exception

  • _debug_max_wait_time – Maximum time in seconds to wait for I/O operations (useful for debugging)

Raises:
set_defaulted_inputs(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], defaulted_inputs: list[InputPortIndex]) None[source]

Mark specific inputs of a node as using default values.

When a node uses default values for inputs that are part of a cycle, this method records that information and increments the stitch level to prevent infinite recursion.

Parameters:
  • node – The node with defaulted inputs

  • defaulted_inputs – List of input port indices using default values

set_node_status(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], status: Queued | Executing | WaitingForStartNextGeneration | Error | Stalled) None[source]

Update the status of a node and notify instrumentation.

Parameters:
  • node – The node whose status is being updated

  • status – The new status to set

class flowno.FlowHDL[source]

Context manager for constructing and executing dataflow graphs.

FlowHDL extends FlowHDLView with the ability to run the resulting Flow. Within the with block users may assign draft nodes to attributes and reference not-yet-defined nodes freely. When the context exits, all placeholders are resolved and the underlying Flow is finalized.

Example

>>> with FlowHDL() as f:
...     f.result = Add(f.a, f.b)
...     f.a = Source(1)
...     f.b = Source(2)
>>> f.run_until_complete()

User defined attribute names should not start with an underscore.

Canonical:

flowno.core.flow_hdl.FlowHDL

KEYWORDS: ClassVar[list[str]] = ['KEYWORDS', 'run_until_complete', 'create_task', 'register_child_result']

Keywords that should not be treated as nodes in the graph.

create_task(raw_task: Coroutine[Command, Any, Any]) TaskHandle[Command][source]

Create a new task handle for the given raw task and enqueue the task in the event loop’s task queue.

Parameters:

raw_task – The raw task to create a handle for.

Returns:

A TaskHandle object representing the created task.

run_until_complete(stop_at_node_generation: dict[DraftNode[Unpack[tuple[Any, ...]], tuple[Any, ...]] | FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = True, _debug_max_wait_time: float | None = None, context_factory: Callable[[FinalizedNode], Any] | None = None) None[source]

Run the flow until all nodes have completed processing.

Parameters:
  • stop_at_node_generation – Optional generation number or mapping of nodes to generation numbers to stop execution at

  • terminate_on_node_error – Whether to terminate the entire flow if any node raises an exception

  • _debug_max_wait_time – Maximum time to wait for nodes to complete (for debugging only)

class flowno.FlowHDLView(on_register_finalized_node: Callable[[FinalizedNode], None])[source]

Base implementation of the FlowHDL attribute protocol.

FlowHDLView acts like a simple namespace for draft nodes. Public attribute assignments are stored in self._nodes while private names (those starting with _) behave like normal Python attributes. Accessing an undefined public attribute before the view is finalized returns a NodePlaceholder so that connections can be declared before the target node is defined. Once finalized, attribute lookups behave normally and missing attributes raise AttributeError.

class FinalizationResult(nodes: dict[str, Any], finalized_nodes: dict[flowno.core.node_base.DraftNode[Unpack[tuple[object, ...]], tuple[object, ...]], flowno.core.node_base.FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]])[source]
register_child_result(result: FinalizationResult) None[source]

Register a finalized child context result with this view.

classmethod register_node(node: DraftNode[Unpack[_Ts], _ReturnTupleT_co]) None[source]

Register a draft node in the context stack.

class flowno.Lock[source]

A mutual exclusion lock for protecting critical sections.

A Lock provides mutual exclusion - only one task can hold the lock at a time. Tasks that attempt to acquire a held lock will block until the lock is released. The lock uses FIFO ordering to prevent starvation.

Examples

>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.primitives import spawn
>>> from flowno.core.event_loop.synchronization import Lock
>>>
>>> async def critical_section(name: str, lock: Lock, shared_counter: list):
...     print(f"{name}: Waiting for lock")
...     async with lock:  # Automatically acquires and releases
...         print(f"{name}: Acquired lock, in critical section")
...         # Simulate work in critical section
...         shared_counter[0] += 1
...         print(f"{name}: Counter = {shared_counter[0]}")
...     print(f"{name}: Released lock")
>>>
>>> async def main():
...     lock = Lock()
...     counter = [0]
...
...     # Start two tasks that compete for the lock
...     task1 = await spawn(critical_section("Task1", lock, counter))
...     task2 = await spawn(critical_section("Task2", lock, counter))
...
...     await task1.join()
...     await task2.join()
...     return counter[0]
>>>
>>> event_loop = EventLoop()
>>> result = event_loop.run_until_complete(main(), join=True)
Task1: Waiting for lock
Task1: Acquired lock, in critical section
Task1: Counter = 1
Task1: Released lock
Task2: Waiting for lock
Task2: Acquired lock, in critical section
Task2: Counter = 2
Task2: Released lock
>>> print(result)
2
async acquire() None[source]

Acquire the lock.

If the lock is available, acquire it immediately and return. If the lock is held by another task, block until it becomes available. Tasks waiting for the lock are served in FIFO order.

is_locked() bool[source]

Check if the lock is currently held.

Returns:

True if the lock is held by any task, False otherwise.

async release() None[source]

Release the lock.

The lock must be held by the current task. If other tasks are waiting, the next task in FIFO order will acquire the lock.

Raises:

AssertionError – If the lock is not held by the current task (checked by event loop).

class flowno.SocketHandle(socket: socket)[source]

Wrapper around the built-in socket object.

This class provides methods that integrate with Flowno’s event loop, allowing socket operations to be performed asynchronously.

accept() Generator[SocketAcceptCommand, None, tuple[SocketHandle, tuple[Any, ...] | str]][source]

Accept a connection on a listening socket.

This coroutine yields a SocketAcceptCommand for the event loop to process. When the event loop detects an incoming connection, it resumes this coroutine.

Returns:

A tuple containing a new SocketHandle for the client connection and the client’s address.

bind(address: tuple[Any, ...] | str, /) None[source]

Bind the socket to the specified address.

Parameters:

address – The address (host, port) to bind to.

connect(address: tuple[Any, ...] | str, /) None[source]

Connect to a remote socket at the specified address.

This is a blocking operation. For non-blocking connections, use the socket primitives from the flowno module.

Parameters:

address – The address to connect to (host, port).

listen(backlog: int | None = None, /) None[source]

Enable a server socket to accept connections.

Parameters:

backlog – The number of unaccepted connections the system will allow before refusing new connections.

recv(bufsize: int) Generator[SocketRecvCommand, None, bytes][source]

Receive data from the socket.

This coroutine yields a SocketRecvCommand for the event loop to process. When data is available to read, the event loop resumes this coroutine.

Parameters:

bufsize – The maximum number of bytes to receive.

Returns:

The bytes received from the socket.

send(data: bytes) Generator[SocketSendCommand, None, int][source]

Send data to the socket.

Unlike sendAll, this sends data once and returns the number of bytes sent.

Parameters:

data – The bytes to send.

Returns:

The number of bytes sent.

sendAll(data: bytes) Generator[SocketSendCommand, None, None][source]

Send all data to the socket.

This coroutine continues yielding SocketSendCommand until all data is sent.

Parameters:

data – The bytes to send.

class flowno.Stream(input: FinalizedInputPortRef[_InputType], output: FinalizedOutputPortRef[_InputType])[source]

A stream of values from one node to another.

Streams connect nodes that produce multiple values over time (run_level > 0) to consuming nodes. They act as async iterators that yield values as they become available.

Type Parameters:

_InputType: The type of data being streamed

cancel() Generator[StreamCancelCommand, None, None][source]

Cancel this stream, causing the producer to receive StreamCancelled on next yield.

exception flowno.StreamCancelled(stream: Stream[Any], message: str = 'Stream was cancelled by consumer')[source]

Raised when a stream consumer cancels the stream.

exception flowno.TerminateLimitReached[source]

Exception raised when a node reaches its generation limit.

async flowno.azip(iterable: AsyncIterator[_T_co], /) AsyncGenerator[tuple[_T_co], None][source]
async flowno.azip(iterable1: AsyncIterator[_T1], iterable2: AsyncIterator[_T2], /) AsyncGenerator[tuple[_T1, _T2], None]
async flowno.azip(iterable1: AsyncIterator[_T1], iterable2: AsyncIterator[_T2], iterable3: AsyncIterator[_T3], /) AsyncGenerator[tuple[_T1, _T2, _T3], None]
async flowno.azip(iterable1: AsyncIterator[_T1], iterable2: AsyncIterator[_T2], iterable3: AsyncIterator[_T3], iterable4: AsyncIterator[_T4], /) AsyncGenerator[tuple[_T1, _T2, _T3, _T4], None]

Combine multiple async iterators, similar to the built-in zip() function.

This function takes multiple async iterators and yields tuples containing items from each iterator, advancing all iterators in lockstep. It stops when the shortest iterator is exhausted.

Parameters:

*args – Two or more async iterators to combine

Yields:

Tuples containing one item from each iterator

Example

>>> async def gen1():
...     for i in range(3):
...         yield i
>>>
>>> async def gen2():
...     yield "a"
...     yield "b"
>>>
>>> # Will yield (0, "a") and (1, "b")
>>> async for pair in azip(gen1(), gen2()):
...     print(pair)
flowno.current_context() Any[source]

Get the context for the currently executing node. Calls the context factory provided to run_until_complete().

Returns:

The NodeContext for the current node.

Raises:

RuntimeError – If called outside a flow, outside a node, or if no context factory was provided.

flowno.current_event_loop() EventLoop | None[source]

Get the currently executing EventLoop instance.

Returns:

The current EventLoop instance, or None if not in an EventLoop context.

flowno.current_flow() Flow | None[source]

Get the currently executing Flow instance.

Returns:

The current Flow instance, or None if not in a Flow context.

flowno.current_node() FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]] | None[source]

Get the current node from the FlowEventLoop’s task context.

flowno.exit(return_value: Any = None, exception: Exception | None = None) Generator[ExitCommand, None, None][source]

Forcibly terminate the event loop.

This is a primitive that allows immediate termination of the event loop, regardless of any remaining tasks or operations. It’s similar to sys.exit() but specific to the Flowno event loop.

Parameters:
  • return_value – Optional value to return from run_until_complete (when join=True).

  • exception – Optional exception to raise from run_until_complete.

Returns:

This function never returns normally as it terminates the event loop.

Examples

>>> async def early_exit():
...     print("About to exit")
...     await exit()  # Terminates the event loop immediately
...     print("This will never be executed")
>>> async def exit_with_result():
...     await exit("success")  # Will be returned if join=True
>>> async def exit_with_error():
...     await exit(exception=ValueError("Something went wrong"))
flowno.node(func_or_cls: Callable[[T1], Coroutine[Any, Any, _ReturnT_co]], /) type[MonoNode1[T1, tuple[_ReturnT_co]]][source]
flowno.node(func_or_cls: Callable[[], Coroutine[Any, Any, None]], /) type[MonoNode0_0]
flowno.node(func_or_cls: Callable[[], Coroutine[Any, Any, _ReturnT_co]], /) type[MonoNode0_1[_ReturnT_co]]
flowno.node(func_or_cls: None = None, /, *, multiple_outputs: Literal[False] | None = None, stream_in: list[str] = EMPTY_LIST) node_meta_single_dec

Decorator that transforms async functions or classes into DraftNode subclasses.

Parameters:
  • func_or_cls – The async function or class to transform

  • multiple_outputs – Whether the node has multiple outputs

  • stream_in – List of input streams

Returns:

A DraftNode subclass or a node_meta decorator

Examples

Basic usage:

>>> from flowno import node
>>>
>>> @node
... async def Add(x: int, y: int) -> int:
...     return x + y
>>>
>>> add_node = Add(1, 2)
>>> print(add_node)  # DraftNode instance

With stream inputs:

>>> from flowno import node, Stream
>>>
>>> @node(stream_in=["a"])
... async def SumStream(x: int, a: Stream[int]) -> int:
...     total = x
...     async for value in a:
...         total += value
...     return total
>>>
>>> sum_stream_node = SumStream(1)
>>> print(sum_stream_node)  # DraftNode instance with stream input

With multiple outputs:

>>> from flowno import node
>>>
>>> @node(multiple_outputs=True)
... async def SumAndDiff(x: int, y: int) -> tuple[int, int]:
...     return x + y, x - y
>>>
>>> sum_and_diff_node = SumAndDiff(3, 1)
>>> print(sum_and_diff_node)  # DraftNode instance with multiple outputs
flowno.sleep(duration: float) Generator[SleepCommand, None, float][source]

Suspend the current task for the specified duration.

Parameters:

duration – The number of seconds to sleep.

Returns:

The actual time slept (which may be slightly longer than requested).

flowno.socket(family: AddressFamily | int = -1, type: SocketKind | int = -1, proto: int = -1, fileno: int | None = None, use_tls: bool = False, ssl_context: SSLContext | None = None, server_hostname: str | None = None) SocketHandle[source]

Create a new socket compatible with Flowno’s event loop.

Parameters:
  • family – The address family (default: AF_INET)

  • type – The socket type (default: SOCK_STREAM)

  • proto – The protocol number (default: 0)

  • fileno – If specified, the socket is created from an existing file descriptor

  • use_tls – When True, creates a TLS-wrapped socket

  • ssl_context – The SSL context to use (if use_tls=True)

  • server_hostname – The server hostname for TLS certificate validation

Returns:

A SocketHandle that can be used with the Flowno event loop.

flowno.spawn(raw_task: Coroutine[Any, Any, _T_co]) Generator[SpawnCommand[_T_co], Any, TaskHandle[_T_co]][source]

Spawn a new task to run concurrently with the current task.

Parameters:

raw_task – The coroutine to run as a new task.

Returns:

A TaskHandle that can be used to wait for the task to complete.