flowno.core.event_loop.event_loop

Custom event loop implementation for Flowno’s asynchronous execution model.

This module provides a lightweight, cooperative multitasking event loop that handles: - Task scheduling and management - Sleeping/timing operations - Network socket operations - Asynchronous queue operations - Task joining and cancellation

The EventLoop class is the central component of Flowno’s asynchronous execution, implementing a command-based coroutine system similar to Python’s asyncio.

This can be used as a standalone event loop without the rest of the Flowno runtime.

class flowno.core.event_loop.event_loop.EventLoop[source]

The core event loop implementation for Flowno’s asynchronous execution model.

Manages task scheduling, I/O operations, and synchronization primitives for the dataflow runtime.

_dump_debug_info(reason: str = 'Signal received') None[source]

Log detailed debug information about the current state of the event loop.

This method provides comprehensive debugging information useful when the event loop is interrupted or appears to be stuck.

Parameters:

reason – The reason for dumping debug info (e.g., “SIGINT received”)

_handle_command(current_task_packet: tuple[Coroutine[Command, Any, Any], Any, Exception | None], command: Command) bool[source]

Handle the command yielded by the current task.

Returns True if the command was successfully handled.

_install_signal_handlers() None[source]

Install signal handlers for debugging interrupted event loops.

_on_task_after_send(task: Coroutine[Command, Any, Any], value: Any, command: Command) None[source]

Hook called after successfully sending a value to a task.

Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.

Parameters:
  • task – The task that received the value

  • value – The value that was sent to the task

  • command – The command yielded by the task

_on_task_after_throw(task: Coroutine[Command, Any, Any], exception: Exception, command: Command) None[source]

Hook called after successfully throwing an exception into a task.

Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.

Parameters:
  • task – The task that received the exception

  • exception – The exception that was thrown into the task

  • command – The command yielded by the task

_on_task_before_send(task: Coroutine[Command, Any, Any], value: Any) None[source]

Hook called before sending a value to a task.

Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.

Parameters:
  • task – The task that will receive the value

  • value – The value being sent to the task

_on_task_before_throw(task: Coroutine[Command, Any, Any], exception: Exception) None[source]

Hook called before throwing an exception into a task.

Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.

Parameters:
  • task – The task that will receive the exception

  • exception – The exception being thrown into the task

_on_task_cancelled(task: Coroutine[Command, Any, Any], exception: Exception) None[source]

Hook called when a task is cancelled.

Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.

Parameters:
  • task – The task that was cancelled

  • exception – The TaskCancelled exception

_on_task_completed(task: Coroutine[Command, Any, Any], result: Any) None[source]

Hook called when a task completes successfully.

Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.

Parameters:
  • task – The task that completed

  • result – The return value of the task

_on_task_error(task: Coroutine[Command, Any, Any], exception: Exception) None[source]

Hook called when a task raises an exception.

Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation.

Parameters:
  • task – The task that raised the exception

  • exception – The exception that was raised

_remove_task_from_wait_states(raw_task: Coroutine[Command, Any, Any]) None[source]

Remove a task from all wait states (sleeping, network, synchronization primitives).

This is used during task cancellation to ensure the task is no longer waiting on any resource and can be immediately scheduled to process the cancellation.

Parameters:

raw_task – The task to remove from wait states.

_uninstall_signal_handlers() None[source]

Restore default signal handlers.

cancel(raw_task: Coroutine[Command, Any, Any]) bool[source]

Cancel a task.

Parameters:

raw_task – The task to cancel.

Returns:

True if the task was successfully cancelled; False if it was already finished or errored.

create_task(raw_task: Coroutine[Command, Any, Any]) TaskHandle[Command][source]

Create a new task handle for the given raw task and enqueue the task in the event loop’s task queue.

Parameters:

raw_task – The raw task to create a handle for.

Returns:

A TaskHandle object representing the created task.

has_living_tasks() bool[source]

Return True if there are any tasks still needing processing.

run_until_complete(root_task: Coroutine[Command, Any, _ReturnT], join: Literal[False] = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None) None[source]
run_until_complete(root_task: Coroutine[Command, Any, _ReturnT], join: bool = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None) _ReturnT

Run the event loop until the given root task is complete.

This method executes the main event loop, processing tasks, handling I/O operations, and managing task synchronization until the root task completes. It can optionally wait for all spawned tasks to finish as well.

Parameters:
  • root_task (RawTask[Command, Any, _ReturnT]) – The coroutine task to execute as the root of the execution graph.

  • join (bool) – When True, returns the result value of the root task. When False, returns None regardless of the task’s result. If the task raises an exception and join=True, the exception is re-raised.

  • wait_for_spawned_tasks (bool) – When True, continue running the event loop until all tasks spawned by the root task have completed. When False, stop as soon as the root task completes.

  • _debug_max_wait_time (float | None) – Optional timeout value in seconds used for debugging. Limits how long the event loop will wait for network or sleeping operations.

Returns:

If join=True, returns the result of the root task (of type _ReturnT).

If join=False, returns None.

Return type:

_ReturnT | None

Raises:
  • RuntimeError – If the event loop exits without completing the root task when join=True.

  • Exception – Any exception raised by the root task is propagated if join=True.

flowno.core.event_loop.event_loop.current_event_loop() EventLoop | None[source]

Get the currently executing EventLoop instance.

Returns:

The current EventLoop instance, or None if not in an EventLoop context.

flowno.core.event_loop.event_loop.current_task() Coroutine[Command, Any, Any] | None[source]

Get the currently executing task in the event loop.

Returns:

The currently executing task, or None if called outside a task context.