flowno.core.event_loop.event_loop
Custom event loop implementation for Flowno’s asynchronous execution model.
This module provides a lightweight, cooperative multitasking event loop that handles: - Task scheduling and management - Sleeping/timing operations - Network socket operations - Asynchronous queue operations - Task joining and cancellation
The EventLoop class is the central component of Flowno’s asynchronous execution, implementing a command-based coroutine system similar to Python’s asyncio.
This can be used as a standalone event loop without the rest of the Flowno runtime.
- class flowno.core.event_loop.event_loop.EventLoop[source]
The core event loop implementation for Flowno’s asynchronous execution model.
Manages task scheduling, I/O operations, and synchronization primitives for the dataflow runtime.
- _dump_debug_info(reason: str = 'Signal received') None[source]
Log detailed debug information about the current state of the event loop.
This method provides comprehensive debugging information useful when the event loop is interrupted or appears to be stuck.
- Parameters:
reason – The reason for dumping debug info (e.g., “SIGINT received”)
- _handle_command(current_task_packet: tuple[Coroutine[Command, Any, Any], Any, Exception | None], command: Command) bool[source]
Handle the command yielded by the current task.
Returns True if the command was successfully handled.
- _install_signal_handlers() None[source]
Install signal handlers for debugging interrupted event loops.
- _on_task_after_send(task: Coroutine[Command, Any, Any], value: Any, command: Command) None[source]
Hook called after successfully sending a value to a task.
Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.
- Parameters:
task – The task that received the value
value – The value that was sent to the task
command – The command yielded by the task
- _on_task_after_throw(task: Coroutine[Command, Any, Any], exception: Exception, command: Command) None[source]
Hook called after successfully throwing an exception into a task.
Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.
- Parameters:
task – The task that received the exception
exception – The exception that was thrown into the task
command – The command yielded by the task
- _on_task_before_send(task: Coroutine[Command, Any, Any], value: Any) None[source]
Hook called before sending a value to a task.
Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.
- Parameters:
task – The task that will receive the value
value – The value being sent to the task
- _on_task_before_throw(task: Coroutine[Command, Any, Any], exception: Exception) None[source]
Hook called before throwing an exception into a task.
Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.
- Parameters:
task – The task that will receive the exception
exception – The exception being thrown into the task
- _on_task_cancelled(task: Coroutine[Command, Any, Any], exception: Exception) None[source]
Hook called when a task is cancelled.
Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.
- Parameters:
task – The task that was cancelled
exception – The TaskCancelled exception
- _on_task_completed(task: Coroutine[Command, Any, Any], result: Any) None[source]
Hook called when a task completes successfully.
Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.
- Parameters:
task – The task that completed
result – The return value of the task
- _on_task_error(task: Coroutine[Command, Any, Any], exception: Exception) None[source]
Hook called when a task raises an exception.
Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.
- Parameters:
task – The task that raised the exception
exception – The exception that was raised
- _remove_task_from_wait_states(raw_task: Coroutine[Command, Any, Any]) None[source]
Remove a task from all wait states (sleeping, network, synchronization primitives).
This is used during task cancellation to ensure the task is no longer waiting on any resource and can be immediately scheduled to process the cancellation.
- Parameters:
raw_task – The task to remove from wait states.
- cancel(raw_task: Coroutine[Command, Any, Any]) bool[source]
Cancel a task.
- Parameters:
raw_task – The task to cancel.
- Returns:
True if the task was successfully cancelled; False if it was already finished or errored.
- create_task(raw_task: Coroutine[Command, Any, Any]) TaskHandle[Command][source]
Create a new task handle for the given raw task and enqueue the task in the event loop’s task queue.
- Parameters:
raw_task – The raw task to create a handle for.
- Returns:
A TaskHandle object representing the created task.
- run_until_complete(root_task: Coroutine[Command, Any, _ReturnT], join: Literal[False] = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None) None[source]
- run_until_complete(root_task: Coroutine[Command, Any, _ReturnT], join: bool = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None) _ReturnT
Run the event loop until the given root task is complete.
This method executes the main event loop, processing tasks, handling I/O operations, and managing task synchronization until the root task completes. It can optionally wait for all spawned tasks to finish as well.
- Parameters:
root_task (RawTask[Command, Any, _ReturnT]) – The coroutine task to execute as the root of the execution graph.
join (bool) – When True, returns the result value of the root task. When False, returns None regardless of the task’s result. If the task raises an exception and join=True, the exception is re-raised.
wait_for_spawned_tasks (bool) – When True, continue running the event loop until all tasks spawned by the root task have completed. When False, stop as soon as the root task completes.
_debug_max_wait_time (float | None) – Optional timeout value in seconds used for debugging. Limits how long the event loop will wait for network or sleeping operations.
- Returns:
- If join=True, returns the result of the root task (of type _ReturnT).
If join=False, returns None.
- Return type:
_ReturnT | None
- Raises:
RuntimeError – If the event loop exits without completing the root task when join=True.
Exception – Any exception raised by the root task is propagated if join=True.