flowno
Flowno: A Python DSL for building dataflow programs.
This module provides tools for creating concurrent, cyclic, and streaming dataflow programs.
- Key features:
Node-based design with the @node decorator
Support for cyclic dependencies and streaming data
Built-in concurrency with a custom event loop
Type-checked node connections
- Configure logging with environment variables:
FLOWNO_LOG_LEVEL: Set logging level (default: ERROR)
FLOWNO_LOG_TAG_FILTER: Filter logs by tags (default: ALL)
FLOWNO_LOG_FILE: Write logs to file when set (e.g., “flowno.log”)
FLOWNO_LOG_CONSOLE: Enable console logging (default: true, set to “false” to disable)
FLOWNO_LOG_PROPAGATE: Propagate logs to parent loggers (default: true, set to “false” to disable)
- class flowno.AsyncQueue(maxsize: int | None = None)[source]
An asynchronous queue for the Flowno event loop.
This queue allows tasks to exchange data safely, with proper synchronization handled by the event loop. When used as an async iterator, it yields items until the queue is closed.
- Parameters:
maxsize – The maximum number of items allowed in the queue. When the queue reaches this size, put() operations will block until items are removed. If None, the queue size is unbounded.
- async close() None[source]
Close the queue, preventing further put operations.
- After closing:
put() will raise QueueClosedError
get() will succeed until the queue is empty, then raise QueueClosedError
AsyncIterator interface will stop iteration when the queue is empty
- close_nowait(event_loop: EventLoop) None[source]
Synchronously close the queue.
This method is designed to be called from within a command handler, where async operations are not possible. It handles waiter notification synchronously through the event loop.
- Parameters:
event_loop – The event loop to use for notifying waiters
- async get() _T[source]
Get an item from the queue.
If the queue is empty, this will wait until an item is put into the queue.
- Returns:
The next item from the queue
- Raises:
QueueClosedError – If the queue is closed and empty
- is_closed() bool[source]
Check if the queue is closed.
- Returns:
True if the queue is closed, False otherwise
- async peek() _T[source]
Peek at the next item without removing it from the queue.
If the queue is empty, this will wait until an item is put into the queue.
- Returns:
The next item from the queue (without removing it)
- Raises:
QueueClosedError – If the queue is closed and empty
- async put(item: _T) None[source]
Put an item into the queue.
If the queue is full and has a maxsize, this will wait until space is available.
- Parameters:
item – The item to put into the queue
- Raises:
QueueClosedError – If the queue is closed
- until_empty() AsyncIterator[_T][source]
Get an async iterator that consumes all items until the queue is empty.
This iterator will close the queue automatically when all items are consumed, unless specified otherwise.
- Returns:
An async iterator that yields items until the queue is empty
- class flowno.Condition(lock: Lock | None = None)[source]
A condition variable for coordinating tasks based on arbitrary conditions.
A Condition is always associated with a Lock. It provides wait() and notify() operations that allow tasks to wait for a condition to become true and to signal when the condition changes.
- Typical usage pattern:
lock = Lock() condition = Condition(lock)
# Waiter: async with condition:
- while not some_condition:
await condition.wait() # Atomically releases lock and waits
# Lock is reacquired here
# Notifier: async with condition:
# Change the condition await condition.notify() # or notify_all()
Note: Condition also supports async with syntax, which acquires/releases the associated lock automatically.
- async notify(n: int = 1) None[source]
Wake up one or more tasks waiting on this condition.
The task must hold the lock before calling notify(). Woken tasks are moved to the lock’s wait queue and will reacquire the lock in FIFO order.
- Parameters:
n – Number of tasks to wake (default 1). Use notify_all() to wake all.
- async notify_all() None[source]
Wake up all tasks waiting on this condition.
The task must hold the lock before calling notify_all(). All woken tasks are moved to the lock’s wait queue and will reacquire the lock in FIFO order.
- notify_all_nowait(event_loop: EventLoop) int[source]
Synchronously notify all waiters on this condition.
This method is designed to be called from within a command handler, where async operations are not possible. It handles waiter notification synchronously through the event loop.
Unlike the normal notify_all path, this does NOT require holding the lock.
- Parameters:
event_loop – The event loop to use for notifying waiters
- Returns:
The number of waiters that were notified
- notify_nowait(event_loop: EventLoop) bool[source]
Synchronously notify one waiter on this condition.
- Parameters:
event_loop – The event loop to use for notifying waiters
- Returns:
True if a waiter was notified, False otherwise
- async wait() None[source]
Wait for the condition to be notified.
This method atomically releases the associated lock and blocks the task until another task calls notify() or notify_all(). When the task wakes up, it automatically reacquires the lock before returning.
The task must hold the lock before calling wait().
- Typical usage is in a while loop checking the condition:
- async with lock:
- while not condition_is_met():
await condition.wait()
- class flowno.DraftGroupNode(*args: Unpack[tuple[Any, ...]])[source]
Minimal draft group node used for experimenting with template groups.
- class flowno.DraftNode(*args: Unpack[tuple[object, ...]])[source]
Abstract Base class for all connectable draft nodes in the flow graph.
DraftNode subclass instance represents a node that can be connected to other nodes to form a computational graph. It only handles input/output connections. The node must be wrapped in a FinalizedNode to be used in a running flow.
Warning
Do not use this class directly, use the
@nodedecorator insteadExamples
>>> from flowno import node, FlowHDL >>> @node ... async def add(a: int, b: int) -> int: ... return a + b >>> with FlowHDL() as f: ... f.result = add(1, f.result)
- class flowno.Event[source]
A one-shot synchronization primitive for waking multiple waiting tasks.
An Event starts in the “not set” state. Tasks can wait() on the event, blocking until another task calls set(). Once set, the event remains set permanently, and all current and future wait() calls return immediately.
This primitive is ideal for signaling completion of an initialization phase or broadcasting a one-time notification to multiple consumers.
Examples
>>> from flowno.core.event_loop.event_loop import EventLoop >>> from flowno.core.event_loop.primitives import spawn >>> from flowno.core.event_loop.synchronization import Event >>> >>> async def waiter(name: str, event: Event): ... print(f"{name}: Waiting...") ... await event.wait() ... print(f"{name}: Event set, proceeding!") ... return name >>> >>> async def setter(event: Event): ... print("Setter: About to set event") ... await event.set() ... print("Setter: Event set") >>> >>> async def main(): ... event = Event() ... ... # Start multiple waiters ... w1 = await spawn(waiter("Waiter1", event)) ... w2 = await spawn(waiter("Waiter2", event)) ... ... # Set the event ... s = await spawn(setter(event)) ... ... # All waiters should proceed ... await w1.join() ... await w2.join() ... await s.join() >>> >>> event_loop = EventLoop() >>> event_loop.run_until_complete(main(), join=True) Waiter1: Waiting... Waiter2: Waiting... Setter: About to set event Setter: Event set Waiter1: Event set, proceeding! Waiter2: Event set, proceeding!
- is_set() bool[source]
Check if the event is set.
- Returns:
True if the event has been set, False otherwise.
- class flowno.EventLoop[source]
The core event loop implementation for Flowno’s asynchronous execution model.
Manages task scheduling, I/O operations, and synchronization primitives for the dataflow runtime.
- cancel(raw_task: Coroutine[Command, Any, Any]) bool[source]
Cancel a task.
- Parameters:
raw_task – The task to cancel.
- Returns:
True if the task was successfully cancelled; False if it was already finished or errored.
- create_task(raw_task: Coroutine[Command, Any, Any]) TaskHandle[Command][source]
Create a new task handle for the given raw task and enqueue the task in the event loop’s task queue.
- Parameters:
raw_task – The raw task to create a handle for.
- Returns:
A TaskHandle object representing the created task.
- run_until_complete(root_task: Coroutine[Command, Any, _ReturnT], join: Literal[False] = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None) None[source]
- run_until_complete(root_task: Coroutine[Command, Any, _ReturnT], join: bool = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None) _ReturnT
Run the event loop until the given root task is complete.
This method executes the main event loop, processing tasks, handling I/O operations, and managing task synchronization until the root task completes. It can optionally wait for all spawned tasks to finish as well.
- Parameters:
root_task (RawTask[Command, Any, _ReturnT]) – The coroutine task to execute as the root of the execution graph.
join (bool) – When True, returns the result value of the root task. When False, returns None regardless of the task’s result. If the task raises an exception and join=True, the exception is re-raised.
wait_for_spawned_tasks (bool) – When True, continue running the event loop until all tasks spawned by the root task have completed. When False, stop as soon as the root task completes.
_debug_max_wait_time (float | None) – Optional timeout value in seconds used for debugging. Limits how long the event loop will wait for network or sleeping operations.
- Returns:
- If join=True, returns the result of the root task (of type _ReturnT).
If join=False, returns None.
- Return type:
_ReturnT | None
- Raises:
RuntimeError – If the event loop exits without completing the root task when join=True.
Exception – Any exception raised by the root task is propagated if join=True.
- class flowno.Flow(is_finalized: bool = True)[source]
Dataflow graph execution engine.
The Flow class manages the execution of a dataflow graph, handling dependency resolution, node scheduling, and cycle breaking. It uses a custom event loop to execute nodes concurrently while respecting data dependencies.
- Key features:
Automatic dependency-based scheduling
Cycle detection and resolution
Support for streaming data (run levels)
Concurrency management
- unvisited
List of nodes that have not yet been visited during execution
- visited
Set of nodes that have been visited
- node_tasks
Dictionary mapping nodes to their tasks and status
- Type:
dict[flowno.core.node_base.FinalizedNode[Unpack[tuple[object, …]], tuple[object, …]], flowno.core.flow.flow.NodeTaskAndStatus]
- resolution_queue
Queue of nodes waiting to be resolved
- Type:
flowno.core.event_loop.queues.AsyncSetQueue[flowno.core.node_base.FinalizedNode[Unpack[tuple[object, …]], tuple[object, …]]]
- add_node(node: FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]])[source]
Add a node to the flow.
- Parameters:
node – The node to add
- add_nodes(nodes: list[FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]]])[source]
Add multiple nodes to the flow.
- Parameters:
nodes – The nodes to add
- clear_defaulted_inputs(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]) None[source]
Remove defaulted input information for a node.
- Parameters:
node – The node to clear defaulted inputs for
- evaluate_node(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]) Never[source]
The persistent task that evaluates a node.
- This is the main execution function for a node. It:
Waits for the node to be ready to run
Gathers inputs and handles defaulted values
Calls the node with its inputs
Processes the result (either coroutine or async generator)
Propagates outputs to dependent nodes
Repeats
- Parameters:
node – The node to evaluate
- Returns:
Never returns; runs as a persistent coroutine
- Raises:
NotImplementedError – If the node does not return a coroutine or async generator
- is_input_defaulted(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], input_port: InputPortIndex) bool[source]
Check if a specific input port is using a default value.
- Parameters:
node – The node to check
input_port – The input port index to check
- Returns:
True if the input port is using a default value, False otherwise
- run_until_complete(stop_at_node_generation: dict[FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]] | DraftNode[Unpack[tuple[Any, ...]], tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = False, _debug_max_wait_time: float | None = None, context_factory: Callable[[FinalizedNode], Any] | None = None)[source]
Execute the flow until completion or until a termination condition is met.
This is the main entry point for running a flow. It starts the resolution process and runs until all nodes have completed or a termination condition (like reaching a generation limit or an error) is met.
- Parameters:
stop_at_node_generation – Generation limit for nodes, either as a global limit or as a dict mapping nodes to their individual limits
terminate_on_node_error – Whether to terminate the flow if a node raises an exception
_debug_max_wait_time – Maximum time in seconds to wait for I/O operations (useful for debugging)
- Raises:
Exception – Any exception raised by nodes and not caught
TerminateLimitReached – When a node reaches its generation limit
- set_defaulted_inputs(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], defaulted_inputs: list[InputPortIndex]) None[source]
Mark specific inputs of a node as using default values.
When a node uses default values for inputs that are part of a cycle, this method records that information and increments the stitch level to prevent infinite recursion.
- Parameters:
node – The node with defaulted inputs
defaulted_inputs – List of input port indices using default values
- set_node_status(node: FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]], status: Queued | Executing | WaitingForStartNextGeneration | Error | Stalled) None[source]
Update the status of a node and notify instrumentation.
- Parameters:
node – The node whose status is being updated
status – The new status to set
- class flowno.FlowHDL[source]
Context manager for constructing and executing dataflow graphs.
FlowHDLextendsFlowHDLViewwith the ability to run the resultingFlow. Within thewithblock users may assign draft nodes to attributes and reference not-yet-defined nodes freely. When the context exits, all placeholders are resolved and the underlyingFlowis finalized.Example
>>> with FlowHDL() as f: ... f.result = Add(f.a, f.b) ... f.a = Source(1) ... f.b = Source(2) >>> f.run_until_complete()
User defined attribute names should not start with an underscore.
- Canonical:
- KEYWORDS: ClassVar[list[str]] = ['KEYWORDS', 'run_until_complete', 'create_task', 'register_child_result']
Keywords that should not be treated as nodes in the graph.
- create_task(raw_task: Coroutine[Command, Any, Any]) TaskHandle[Command][source]
Create a new task handle for the given raw task and enqueue the task in the event loop’s task queue.
- Parameters:
raw_task – The raw task to create a handle for.
- Returns:
A TaskHandle object representing the created task.
- run_until_complete(stop_at_node_generation: dict[DraftNode[Unpack[tuple[Any, ...]], tuple[Any, ...]] | FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]], tuple[int, ...] | None] | tuple[int, ...] | None = (), terminate_on_node_error: bool = True, _debug_max_wait_time: float | None = None, context_factory: Callable[[FinalizedNode], Any] | None = None) None[source]
Run the flow until all nodes have completed processing.
- Parameters:
stop_at_node_generation – Optional generation number or mapping of nodes to generation numbers to stop execution at
terminate_on_node_error – Whether to terminate the entire flow if any node raises an exception
_debug_max_wait_time – Maximum time to wait for nodes to complete (for debugging only)
- class flowno.FlowHDLView(on_register_finalized_node: Callable[[FinalizedNode], None])[source]
Base implementation of the
FlowHDLattribute protocol.FlowHDLViewacts like a simple namespace for draft nodes. Public attribute assignments are stored inself._nodeswhile private names (those starting with_) behave like normal Python attributes. Accessing an undefined public attribute before the view is finalized returns aNodePlaceholderso that connections can be declared before the target node is defined. Once finalized, attribute lookups behave normally and missing attributes raiseAttributeError.- class FinalizationResult(nodes: dict[str, Any], finalized_nodes: dict[flowno.core.node_base.DraftNode[Unpack[tuple[object, ...]], tuple[object, ...]], flowno.core.node_base.FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]]])[source]
- register_child_result(result: FinalizationResult) None[source]
Register a finalized child context result with this view.
- class flowno.Lock[source]
A mutual exclusion lock for protecting critical sections.
A Lock provides mutual exclusion - only one task can hold the lock at a time. Tasks that attempt to acquire a held lock will block until the lock is released. The lock uses FIFO ordering to prevent starvation.
Examples
>>> from flowno.core.event_loop.event_loop import EventLoop >>> from flowno.core.event_loop.primitives import spawn >>> from flowno.core.event_loop.synchronization import Lock >>> >>> async def critical_section(name: str, lock: Lock, shared_counter: list): ... print(f"{name}: Waiting for lock") ... async with lock: # Automatically acquires and releases ... print(f"{name}: Acquired lock, in critical section") ... # Simulate work in critical section ... shared_counter[0] += 1 ... print(f"{name}: Counter = {shared_counter[0]}") ... print(f"{name}: Released lock") >>> >>> async def main(): ... lock = Lock() ... counter = [0] ... ... # Start two tasks that compete for the lock ... task1 = await spawn(critical_section("Task1", lock, counter)) ... task2 = await spawn(critical_section("Task2", lock, counter)) ... ... await task1.join() ... await task2.join() ... return counter[0] >>> >>> event_loop = EventLoop() >>> result = event_loop.run_until_complete(main(), join=True) Task1: Waiting for lock Task1: Acquired lock, in critical section Task1: Counter = 1 Task1: Released lock Task2: Waiting for lock Task2: Acquired lock, in critical section Task2: Counter = 2 Task2: Released lock >>> print(result) 2
- async acquire() None[source]
Acquire the lock.
If the lock is available, acquire it immediately and return. If the lock is held by another task, block until it becomes available. Tasks waiting for the lock are served in FIFO order.
- is_locked() bool[source]
Check if the lock is currently held.
- Returns:
True if the lock is held by any task, False otherwise.
- async release() None[source]
Release the lock.
The lock must be held by the current task. If other tasks are waiting, the next task in FIFO order will acquire the lock.
- Raises:
AssertionError – If the lock is not held by the current task (checked by event loop).
- class flowno.SocketHandle(socket: socket)[source]
Wrapper around the built-in socket object.
This class provides methods that integrate with Flowno’s event loop, allowing socket operations to be performed asynchronously.
- accept() Generator[SocketAcceptCommand, None, tuple[SocketHandle, tuple[Any, ...] | str]][source]
Accept a connection on a listening socket.
This coroutine yields a SocketAcceptCommand for the event loop to process. When the event loop detects an incoming connection, it resumes this coroutine.
- Returns:
A tuple containing a new SocketHandle for the client connection and the client’s address.
- bind(address: tuple[Any, ...] | str, /) None[source]
Bind the socket to the specified address.
- Parameters:
address – The address (host, port) to bind to.
- connect(address: tuple[Any, ...] | str, /) None[source]
Connect to a remote socket at the specified address.
This is a blocking operation. For non-blocking connections, use the socket primitives from the flowno module.
- Parameters:
address – The address to connect to (host, port).
- listen(backlog: int | None = None, /) None[source]
Enable a server socket to accept connections.
- Parameters:
backlog – The number of unaccepted connections the system will allow before refusing new connections.
- recv(bufsize: int) Generator[SocketRecvCommand, None, bytes][source]
Receive data from the socket.
This coroutine yields a SocketRecvCommand for the event loop to process. When data is available to read, the event loop resumes this coroutine.
- Parameters:
bufsize – The maximum number of bytes to receive.
- Returns:
The bytes received from the socket.
- class flowno.Stream(input: FinalizedInputPortRef[_InputType], output: FinalizedOutputPortRef[_InputType])[source]
A stream of values from one node to another.
Streams connect nodes that produce multiple values over time (run_level > 0) to consuming nodes. They act as async iterators that yield values as they become available.
- Type Parameters:
_InputType: The type of data being streamed
- exception flowno.StreamCancelled(stream: Stream[Any], message: str = 'Stream was cancelled by consumer')[source]
Raised when a stream consumer cancels the stream.
- exception flowno.TerminateLimitReached[source]
Exception raised when a node reaches its generation limit.
- async flowno.azip(iterable: AsyncIterator[_T_co], /) AsyncGenerator[tuple[_T_co], None][source]
- async flowno.azip(iterable1: AsyncIterator[_T1], iterable2: AsyncIterator[_T2], /) AsyncGenerator[tuple[_T1, _T2], None]
- async flowno.azip(iterable1: AsyncIterator[_T1], iterable2: AsyncIterator[_T2], iterable3: AsyncIterator[_T3], /) AsyncGenerator[tuple[_T1, _T2, _T3], None]
- async flowno.azip(iterable1: AsyncIterator[_T1], iterable2: AsyncIterator[_T2], iterable3: AsyncIterator[_T3], iterable4: AsyncIterator[_T4], /) AsyncGenerator[tuple[_T1, _T2, _T3, _T4], None]
Combine multiple async iterators, similar to the built-in zip() function.
This function takes multiple async iterators and yields tuples containing items from each iterator, advancing all iterators in lockstep. It stops when the shortest iterator is exhausted.
- Parameters:
*args – Two or more async iterators to combine
- Yields:
Tuples containing one item from each iterator
Example
>>> async def gen1(): ... for i in range(3): ... yield i >>> >>> async def gen2(): ... yield "a" ... yield "b" >>> >>> # Will yield (0, "a") and (1, "b") >>> async for pair in azip(gen1(), gen2()): ... print(pair)
- flowno.current_context() Any[source]
Get the context for the currently executing node. Calls the context factory provided to run_until_complete().
- Returns:
The NodeContext for the current node.
- Raises:
RuntimeError – If called outside a flow, outside a node, or if no context factory was provided.
- flowno.current_event_loop() EventLoop | None[source]
Get the currently executing EventLoop instance.
- Returns:
The current EventLoop instance, or None if not in an EventLoop context.
- flowno.current_flow() Flow | None[source]
Get the currently executing Flow instance.
- Returns:
The current Flow instance, or None if not in a Flow context.
- flowno.current_node() FinalizedNode[Unpack[tuple[Any, ...]], tuple[Any, ...]] | None[source]
Get the current node from the FlowEventLoop’s task context.
- flowno.exit(return_value: Any = None, exception: Exception | None = None) Generator[ExitCommand, None, None][source]
Forcibly terminate the event loop.
This is a primitive that allows immediate termination of the event loop, regardless of any remaining tasks or operations. It’s similar to sys.exit() but specific to the Flowno event loop.
- Parameters:
return_value – Optional value to return from run_until_complete (when join=True).
exception – Optional exception to raise from run_until_complete.
- Returns:
This function never returns normally as it terminates the event loop.
Examples
>>> async def early_exit(): ... print("About to exit") ... await exit() # Terminates the event loop immediately ... print("This will never be executed")
>>> async def exit_with_result(): ... await exit("success") # Will be returned if join=True
>>> async def exit_with_error(): ... await exit(exception=ValueError("Something went wrong"))
- flowno.node(func_or_cls: Callable[[T1], Coroutine[Any, Any, _ReturnT_co]], /) type[MonoNode1[T1, tuple[_ReturnT_co]]][source]
- flowno.node(func_or_cls: Callable[[], Coroutine[Any, Any, None]], /) type[MonoNode0_0]
- flowno.node(func_or_cls: Callable[[], Coroutine[Any, Any, _ReturnT_co]], /) type[MonoNode0_1[_ReturnT_co]]
- flowno.node(func_or_cls: None = None, /, *, multiple_outputs: Literal[False] | None = None, stream_in: list[str] = EMPTY_LIST) node_meta_single_dec
Decorator that transforms async functions or classes into DraftNode subclasses.
- Parameters:
func_or_cls – The async function or class to transform
multiple_outputs – Whether the node has multiple outputs
stream_in – List of input streams
- Returns:
A DraftNode subclass or a node_meta decorator
Examples
Basic usage:
>>> from flowno import node >>> >>> @node ... async def Add(x: int, y: int) -> int: ... return x + y >>> >>> add_node = Add(1, 2) >>> print(add_node) # DraftNode instance
With stream inputs:
>>> from flowno import node, Stream >>> >>> @node(stream_in=["a"]) ... async def SumStream(x: int, a: Stream[int]) -> int: ... total = x ... async for value in a: ... total += value ... return total >>> >>> sum_stream_node = SumStream(1) >>> print(sum_stream_node) # DraftNode instance with stream input
With multiple outputs:
>>> from flowno import node >>> >>> @node(multiple_outputs=True) ... async def SumAndDiff(x: int, y: int) -> tuple[int, int]: ... return x + y, x - y >>> >>> sum_and_diff_node = SumAndDiff(3, 1) >>> print(sum_and_diff_node) # DraftNode instance with multiple outputs
- flowno.sleep(duration: float) Generator[SleepCommand, None, float][source]
Suspend the current task for the specified duration.
- Parameters:
duration – The number of seconds to sleep.
- Returns:
The actual time slept (which may be slightly longer than requested).
- flowno.socket(family: AddressFamily | int = -1, type: SocketKind | int = -1, proto: int = -1, fileno: int | None = None, use_tls: bool = False, ssl_context: SSLContext | None = None, server_hostname: str | None = None) SocketHandle[source]
Create a new socket compatible with Flowno’s event loop.
- Parameters:
family – The address family (default: AF_INET)
type – The socket type (default: SOCK_STREAM)
proto – The protocol number (default: 0)
fileno – If specified, the socket is created from an existing file descriptor
use_tls – When True, creates a TLS-wrapped socket
ssl_context – The SSL context to use (if use_tls=True)
server_hostname – The server hostname for TLS certificate validation
- Returns:
A SocketHandle that can be used with the Flowno event loop.
- flowno.spawn(raw_task: Coroutine[Any, Any, _T_co]) Generator[SpawnCommand[_T_co], Any, TaskHandle[_T_co]][source]
Spawn a new task to run concurrently with the current task.
- Parameters:
raw_task – The coroutine to run as a new task.
- Returns:
A TaskHandle that can be used to wait for the task to complete.