Source code for flowno.core.event_loop.event_loop

"""
Custom event loop implementation for Flowno's asynchronous execution model.

This module provides a lightweight, cooperative multitasking event loop that handles:
- Task scheduling and management
- Sleeping/timing operations
- Network socket operations
- Asynchronous queue operations
- Task joining and cancellation

The EventLoop class is the central component of Flowno's asynchronous execution,
implementing a command-based coroutine system similar to Python's asyncio.

This can be used as a standalone event loop without the rest of the Flowno runtime.
"""

from contextvars import ContextVar
import heapq
import logging
import selectors
import signal
import socket
import threading
from collections import defaultdict, deque
from timeit import default_timer as timer
from typing import Any, Literal, TypeVar, cast

from flowno.core.event_loop.commands import (
    CancelCommand,
    Command,
    ConditionNotifyCommand,
    ConditionWaitCommand,
    EventSetCommand,
    EventWaitCommand,
    ExitCommand,
    JoinCommand,
    LockAcquireCommand,
    LockReleaseCommand,
    SleepCommand,
    SocketAcceptCommand,
    SocketCommand,
    SocketRecvCommand,
    SocketSendCommand,
    SpawnCommand,
)
from flowno.core.event_loop.instrumentation import (
    InstrumentationMetadata,
    ReadySocketInstrumentationMetadata,
    TaskSendMetadata,
    TaskThrowMetadata,
    get_current_instrument,
)
from flowno.core.event_loop.queues import (
    AsyncQueue,
    QueueClosedError,
)
from flowno.core.event_loop.selectors import sel
from flowno.core.event_loop.tasks import TaskCancelled, TaskHandle
from flowno.core.event_loop.types import (
    DeltaTime,
    RawTask,
    RawTaskPacket,
    TaskHandlePacket,
    Time,
)
from typing_extensions import overload

logger = logging.getLogger(__name__)

_ReturnT = TypeVar("_ReturnT")

_current_task: ContextVar[RawTask[Command, Any, Any] | None] = ContextVar("_current_task", default=None)

[docs] def current_task() -> RawTask[Command, Any, Any] | None: """ Get the currently executing task in the event loop. Returns: The currently executing task, or None if called outside a task context. """ return _current_task.get()
_current_event_loop: "EventLoop | None" = None
[docs] def current_event_loop() -> "EventLoop | None": """ Get the currently executing EventLoop instance. Returns: The current EventLoop instance, or None if not in an EventLoop context. """ global _current_event_loop return _current_event_loop
[docs] class EventLoop: """ The core event loop implementation for Flowno's asynchronous execution model. Manages task scheduling, I/O operations, and synchronization primitives for the dataflow runtime. """ def __init__(self) -> None: self.tasks: deque[RawTaskPacket[Command, Any, object, Exception]] = deque() self.sleeping: list[tuple[Time, RawTask[SleepCommand, None, DeltaTime]]] = [] self.watching_task: defaultdict[ RawTask[Command, object, object], list[RawTask[Command, object, object]] ] = defaultdict(list) self.waiting_on_network: list[RawTask[SocketCommand, Any, Any]] = [] # Synchronization primitive waiters self.event_waiters: defaultdict[Any, set[RawTask[Command, Any, Any]]] = defaultdict(set) self.lock_waiters: defaultdict[Any, deque[RawTask[Command, Any, Any]]] = defaultdict(deque) self.condition_waiters: defaultdict[Any, set[RawTask[Command, Any, Any]]] = defaultdict(set) self.finished: dict[RawTask[Command, Any, Any], object] = {} self.exceptions: dict[RawTask[Command, Any, Any], Exception] = {} self.cancelled: set[RawTask[Command, Any, Any]] = set() # Track tasks with pending cancellation (TaskCancelled already queued) self.pending_cancellation: set[RawTask[Command, Any, Any]] = set() # Track which watchers are waiting due to cancel() vs join() self.cancel_waiters: defaultdict[ RawTask[Command, object, object], set[RawTask[Command, object, object]] ] = defaultdict(set) self._debug_max_wait_time: float | None = None self._loop_thread: threading.Thread | None = None self._wakeup_reader, self._wakeup_writer = socket.socketpair() self._wakeup_reader.setblocking(False) self._wakeup_writer.setblocking(False) self._exit_requested: tuple[bool, object, Exception | None] = ( False, None, None, ) self._signal_handlers_installed = False
[docs] def _dump_debug_info(self, reason: str = "Signal received") -> None: """ Log detailed debug information about the current state of the event loop. This method provides comprehensive debugging information useful when the event loop is interrupted or appears to be stuck. Args: reason: The reason for dumping debug info (e.g., "SIGINT received") """ logger.warning(f"=== EVENT LOOP DEBUG INFO ({reason}) ===") # Basic task counts logger.warning(f"Active tasks in queue: {len(self.tasks)}") logger.warning(f"Sleeping tasks: {len(self.sleeping)}") logger.warning(f"Tasks waiting on network I/O: {len(self.waiting_on_network)}") # Count tasks waiting on synchronization primitives event_waiter_count = sum(len(waiters) for waiters in self.event_waiters.values()) lock_waiter_count = sum(len(waiters) for waiters in self.lock_waiters.values()) condition_waiter_count = sum(len(waiters) for waiters in self.condition_waiters.values()) logger.warning(f"Tasks waiting on events: {event_waiter_count}") logger.warning(f"Tasks waiting on locks: {lock_waiter_count}") logger.warning(f"Tasks waiting on conditions: {condition_waiter_count}") # Task details if self.tasks: logger.warning("=== ACTIVE TASKS ===") for i, (task, send_value, exception) in enumerate(self.tasks): logger.warning(f" Task {i}: {task}") if send_value is not None: logger.warning(f" Pending send value: {send_value}") if exception is not None: logger.warning(f" Pending exception: {exception}") # Sleeping tasks details if self.sleeping: logger.warning("=== SLEEPING TASKS ===") current_time = timer() for wake_time, task in self.sleeping[:5]: # Show first 5 sleeping tasks time_remaining = wake_time - current_time logger.warning(f" Task: {task}, wakes in {time_remaining:.3f}s") if len(self.sleeping) > 5: logger.warning( f" ... and {len(self.sleeping) - 5} more sleeping tasks" ) # Network I/O tasks if self.waiting_on_network: logger.warning("=== NETWORK I/O TASKS ===") for task in self.waiting_on_network[:5]: # Show first 5 network tasks logger.warning(f" Task: {task}") if len(self.waiting_on_network) > 5: logger.warning( f" ... and {len(self.waiting_on_network) - 5} more network tasks" ) # Event waiting tasks if event_waiter_count > 0: logger.warning("=== EVENT WAITING TASKS ===") shown = 0 for event, waiters in self.event_waiters.items(): for task in waiters: if shown >= 5: break logger.warning(f" Task: {task} (waiting on {event})") shown += 1 if shown >= 5: break if event_waiter_count > 5: logger.warning(f" ... and {event_waiter_count - 5} more event waiters") # Lock waiting tasks if lock_waiter_count > 0: logger.warning("=== LOCK WAITING TASKS ===") shown = 0 for lock, waiters in self.lock_waiters.items(): for task in waiters: if shown >= 5: break logger.warning(f" Task: {task} (waiting on {lock})") shown += 1 if shown >= 5: break if lock_waiter_count > 5: logger.warning(f" ... and {lock_waiter_count - 5} more lock waiters") # Condition waiting tasks if condition_waiter_count > 0: logger.warning("=== CONDITION WAITING TASKS ===") shown = 0 for condition, waiters in self.condition_waiters.items(): for task in waiters: if shown >= 5: break logger.warning(f" Task: {task} (waiting on {condition})") shown += 1 if shown >= 5: break if condition_waiter_count > 5: logger.warning(f" ... and {condition_waiter_count - 5} more condition waiters") # Task watching relationships watching_count = sum(len(watchers) for watchers in self.watching_task.values()) if watching_count > 0: logger.warning(f"=== TASK WATCHING ({watching_count} relationships) ===") count = 0 for watched_task, watchers in self.watching_task.items(): if count >= 5: # Limit output logger.warning( f" ... and {watching_count - count} more relationships" ) break if watchers: logger.warning(f" {watched_task} watched by {len(watchers)} tasks") count += len(watchers) # Finished and exception tasks logger.warning(f"Finished tasks: {len(self.finished)}") logger.warning(f"Tasks with exceptions: {len(self.exceptions)}") logger.warning(f"Cancelled tasks: {len(self.cancelled)}") logger.warning("=== END DEBUG INFO ===")
[docs] def _install_signal_handlers(self) -> None: """Install signal handlers for debugging interrupted event loops.""" if self._signal_handlers_installed: return def signal_handler(signum: int, frame: Any) -> None: sig_name = signal.Signals(signum).name self._dump_debug_info(f"Signal {sig_name} received") # Re-raise KeyboardInterrupt for SIGINT to maintain normal behavior if signum == signal.SIGINT: raise KeyboardInterrupt() # Install handlers for common interrupt signals try: signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Only install SIGUSR1 on Unix systems if hasattr(signal, "SIGUSR1"): signal.signal(signal.SIGUSR1, signal_handler) self._signal_handlers_installed = True logger.debug("Signal handlers installed for event loop debugging") except (OSError, ValueError) as e: # Signal handling might not be available in all contexts (e.g., threads) logger.debug(f"Could not install signal handlers: {e}")
[docs] def _uninstall_signal_handlers(self) -> None: """Restore default signal handlers.""" if not self._signal_handlers_installed: return try: signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL) if hasattr(signal, "SIGUSR1"): signal.signal(signal.SIGUSR1, signal.SIG_DFL) self._signal_handlers_installed = False logger.debug("Signal handlers uninstalled") except (OSError, ValueError) as e: logger.debug(f"Could not uninstall signal handlers: {e}")
[docs] def has_living_tasks(self) -> bool: """Return True if there are any tasks still needing processing.""" if self.tasks or self.sleeping or self.waiting_on_network: return True for _watched_task, watching_tasks in self.watching_task.items(): if watching_tasks: return True # Check synchronization primitive waiters if any(self.event_waiters.values()): return True if any(self.lock_waiters.values()): return True if any(self.condition_waiters.values()): return True return False
[docs] def create_task( self, raw_task: RawTask[Command, Any, Any], ) -> TaskHandle[Command]: """ Create a new task handle for the given raw task and enqueue the task in the event loop's task queue. Args: raw_task: The raw task to create a handle for. Returns: A TaskHandle object representing the created task. """ self.tasks.append((raw_task, None, None)) # If called from another thread, wake up the event loop if ( self._loop_thread is not None and threading.current_thread() != self._loop_thread ): try: self._wakeup_writer.send(b"\x00") except (BlockingIOError, socket.error): pass return TaskHandle(self, raw_task)
[docs] def _on_task_before_send( self, task: RawTask[Command, Any, Any], value: Any ) -> None: """ Hook called before sending a value to a task. Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation. Args: task: The task that will receive the value value: The value being sent to the task """ instrument = get_current_instrument() instrument.on_task_before_send(TaskSendMetadata(task=task, send_value=value))
[docs] def _on_task_after_send( self, task: RawTask[Command, Any, Any], value: Any, command: Command ) -> None: """ Hook called after successfully sending a value to a task. Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation. Args: task: The task that received the value value: The value that was sent to the task command: The command yielded by the task """ instrument = get_current_instrument() instrument.on_task_after_send(TaskSendMetadata(task=task, send_value=value), command)
[docs] def _on_task_before_throw( self, task: RawTask[Command, Any, Any], exception: Exception ) -> None: """ Hook called before throwing an exception into a task. Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation. Args: task: The task that will receive the exception exception: The exception being thrown into the task """ instrument = get_current_instrument() instrument.on_task_before_throw(TaskThrowMetadata(task=task, exception=exception))
[docs] def _on_task_after_throw( self, task: RawTask[Command, Any, Any], exception: Exception, command: Command ) -> None: """ Hook called after successfully throwing an exception into a task. Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation. Args: task: The task that received the exception exception: The exception that was thrown into the task command: The command yielded by the task """ instrument = get_current_instrument() instrument.on_task_after_throw(TaskThrowMetadata(task=task, exception=exception), command)
[docs] def _on_task_completed( self, task: RawTask[Command, Any, Any], result: Any ) -> None: """ Hook called when a task completes successfully. Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation. Args: task: The task that completed result: The return value of the task """ instrument = get_current_instrument() instrument.on_task_completed(task, result)
[docs] def _on_task_error( self, task: RawTask[Command, Any, Any], exception: Exception ) -> None: """ Hook called when a task raises an exception. Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation. Args: task: The task that raised the exception exception: The exception that was raised """ instrument = get_current_instrument() instrument.on_task_error(task, exception)
[docs] def _on_task_cancelled( self, task: RawTask[Command, Any, Any], exception: Exception ) -> None: """ Hook called when a task is cancelled. Subclasses can override this method to add custom behavior. The default implementation delegates to the current instrumentation. Args: task: The task that was cancelled exception: The TaskCancelled exception """ instrument = get_current_instrument() instrument.on_task_cancelled(task, exception)
[docs] def _handle_command( self, current_task_packet: TaskHandlePacket[Command, Any, Any, Exception], command: Command, ) -> bool: """ Handle the command yielded by the current task. Returns True if the command was successfully handled. """ if isinstance(command, SpawnCommand): command = cast(SpawnCommand[object], command) current_task_packet = cast( TaskHandlePacket[SpawnCommand[object], Any, Any, Exception], current_task_packet, ) new_task = TaskHandle[object](self, command.raw_task) self.tasks.append((command.raw_task, None, None)) self.tasks.append((current_task_packet[0], new_task, None)) elif isinstance(command, JoinCommand): command = cast(JoinCommand[object], command) current_task_packet = cast( TaskHandlePacket[JoinCommand[object], Any, Any, Exception], current_task_packet, ) if command.task_handle.is_finished: self.tasks.append( ( current_task_packet[0], self.finished[command.task_handle.raw_task], None, ) ) elif command.task_handle.is_error or command.task_handle.is_cancelled: self.tasks.append( ( current_task_packet[0], None, self.exceptions[command.task_handle.raw_task], ) ) else: # wait for the joined task to finish self.watching_task[command.task_handle.raw_task].append( current_task_packet[0] ) elif isinstance(command, CancelCommand): command = cast(CancelCommand[object], command) current_task_packet = cast( TaskHandlePacket[CancelCommand[object], Any, Any, Exception], current_task_packet, ) # First, check if the task is already finished if command.task_handle.is_finished: # Task already finished - return its result self.tasks.append( ( current_task_packet[0], self.finished[command.task_handle.raw_task], None, ) ) elif command.task_handle.is_error or command.task_handle.is_cancelled: # Task already has an error or is cancelled - return the exception self.tasks.append( ( current_task_packet[0], None, self.exceptions[command.task_handle.raw_task], ) ) elif command.task_handle.raw_task in self.pending_cancellation: # Cancellation already in progress (from event_loop.cancel() call) # Just add the canceller to the watching list to wait for completion self.watching_task[command.task_handle.raw_task].append( current_task_packet[0] ) # Track that this is a cancel waiter (not a join waiter) self.cancel_waiters[command.task_handle.raw_task].add( current_task_packet[0] ) else: # Task is still running - cancel it by throwing TaskCancelled # Mark as pending cancellation self.pending_cancellation.add(command.task_handle.raw_task) # Remove from all wait states so it can process cancellation immediately self._remove_task_from_wait_states(command.task_handle.raw_task) # Inject the exception into the target task self.tasks.append( ( command.task_handle.raw_task, None, TaskCancelled(command.task_handle), ) ) # Then, add the canceller to the watching list to wait for completion # (Don't add the canceller to task queue - it should wait) self.watching_task[command.task_handle.raw_task].append( current_task_packet[0] ) # Track that this is a cancel waiter (not a join waiter) self.cancel_waiters[command.task_handle.raw_task].add( current_task_packet[0] ) elif isinstance(command, SleepCommand): current_task_packet = cast( TaskHandlePacket[SleepCommand, None, DeltaTime, Exception], current_task_packet, ) if command.end_time <= timer(): self.tasks.append((current_task_packet[0], None, None)) else: heapq.heappush( self.sleeping, (command.end_time, current_task_packet[0]) ) elif isinstance(command, SocketAcceptCommand): current_task_packet = cast( TaskHandlePacket[SocketAcceptCommand, None, None, Exception], current_task_packet, ) metadata = InstrumentationMetadata( _task=current_task_packet[0], _command=command, socket_handle=command.handle, ) get_current_instrument().on_socket_accept_start(metadata) self.waiting_on_network.append(current_task_packet[0]) _ = sel.register(command.handle.socket, selectors.EVENT_READ, metadata) elif isinstance(command, SocketSendCommand): current_task_packet = cast( TaskHandlePacket[SocketSendCommand, None, None, Exception], current_task_packet, ) metadata = InstrumentationMetadata( _task=current_task_packet[0], _command=command, socket_handle=command.handle, ) get_current_instrument().on_socket_send_start(metadata) self.waiting_on_network.append(current_task_packet[0]) _ = sel.register(command.handle.socket, selectors.EVENT_WRITE, metadata) elif isinstance(command, SocketRecvCommand): current_task_packet = cast( TaskHandlePacket[SocketRecvCommand, None, None, Exception], current_task_packet, ) metadata = InstrumentationMetadata( _task=current_task_packet[0], _command=command, socket_handle=command.handle, ) get_current_instrument().on_socket_recv_start(metadata) self.waiting_on_network.append(current_task_packet[0]) _ = sel.register(command.handle.socket, selectors.EVENT_READ, metadata) elif isinstance(command, ExitCommand): # Handle the exit command # Mark the task as finished regardless of whether we're exiting normally or with an exception # This prevents "event loop exited without completing the root task" errors self.finished[current_task_packet[0]] = command.return_value # If there's an exception, raise it immediately (will be caught in run_until_complete) if command.exception is not None: raise command.exception else: # Set the exit flag with the return value and no exception self._exit_requested = (True, command.return_value, None) elif isinstance(command, EventWaitCommand): # Wait for an event to be set if command.event._set: # Event already set - immediate resume # This should not actually reach the event loop, but just in case self.tasks.append((current_task_packet[0], None, None)) else: # Event not set - block task self.event_waiters[command.event].add(current_task_packet[0]) elif isinstance(command, EventSetCommand): # Set event and wake all waiting tasks command.event._set = True for waiter in self.event_waiters[command.event]: self.tasks.append((waiter, None, None)) self.event_waiters[command.event].clear() self.tasks.append((current_task_packet[0], None, None)) elif isinstance(command, LockAcquireCommand): # Acquire lock (mutual exclusion) if not command.lock._locked: # Lock available - acquire immediately command.lock._locked = True command.lock._owner = current_task_packet[0] self.tasks.append((current_task_packet[0], None, None)) else: # Lock held - block in FIFO queue self.lock_waiters[command.lock].append(current_task_packet[0]) elif isinstance(command, LockReleaseCommand): # Release lock (must be owner, unless task was cancelled while waiting on condition) # When a task is cancelled while waiting on a condition, it doesn't own the lock # but the async with block will still try to release it in __aexit__ if command.lock._owner != current_task_packet[0]: # Task doesn't own the lock - this can happen when cancelled while # waiting on a condition. Just let the task continue without releasing. self.tasks.append((current_task_packet[0], None, None)) else: if self.lock_waiters[command.lock]: # Wake next waiter in FIFO order, transfer ownership waiter = self.lock_waiters[command.lock].popleft() command.lock._owner = waiter self.tasks.append((waiter, None, None)) else: # No waiters - unlock command.lock._locked = False command.lock._owner = None # Releaser continues self.tasks.append((current_task_packet[0], None, None)) elif isinstance(command, ConditionWaitCommand): # Wait on condition (atomically releases lock) # Must hold lock before calling wait assert command.condition._lock._owner == current_task_packet[0], \ f"Condition wait by non-owner: {current_task_packet[0]} != {command.condition._lock._owner}" # Atomically: add to condition waiters, release lock self.condition_waiters[command.condition].add(current_task_packet[0]) # Release the lock and wake next lock waiter if any if self.lock_waiters[command.condition._lock]: waiter = self.lock_waiters[command.condition._lock].popleft() command.condition._lock._owner = waiter self.tasks.append((waiter, None, None)) else: command.condition._lock._locked = False command.condition._lock._owner = None elif isinstance(command, ConditionNotifyCommand): # Notify waiters on condition (must hold lock) assert command.condition._lock._owner == current_task_packet[0], \ f"Condition notify by non-owner: {current_task_packet[0]} != {command.condition._lock._owner}" if command.all: # notify_all: move all condition waiters to lock waiters for waiter in self.condition_waiters[command.condition]: self.lock_waiters[command.condition._lock].append(waiter) self.condition_waiters[command.condition].clear() else: # notify: move one condition waiter to lock waiters if self.condition_waiters[command.condition]: waiter = self.condition_waiters[command.condition].pop() self.lock_waiters[command.condition._lock].append(waiter) # Notifier continues self.tasks.append((current_task_packet[0], None, None)) else: return False return True
[docs] def _remove_task_from_wait_states(self, raw_task: RawTask[Command, Any, Any]) -> None: """ Remove a task from all wait states (sleeping, network, synchronization primitives). This is used during task cancellation to ensure the task is no longer waiting on any resource and can be immediately scheduled to process the cancellation. Args: raw_task: The task to remove from wait states. """ # Remove from sleeping heap if present self.sleeping = [ (wake_time, task) for wake_time, task in self.sleeping if task != raw_task ] heapq.heapify(self.sleeping) # Remove from network waiting list if it's there if raw_task in self.waiting_on_network: self.waiting_on_network.remove(raw_task) # Also need to unregister from selector # Find and unregister the socket associated with this task for key in list(sel.get_map().values()): metadata = cast(InstrumentationMetadata, key.data) if metadata._task == raw_task: sel.unregister(key.fileobj) break # Remove from synchronization primitive waiters if present # (event_waiters, lock_waiters, condition_waiters) for event, waiters in self.event_waiters.items(): if raw_task in waiters: waiters.discard(raw_task) break for lock, waiters in self.lock_waiters.items(): if raw_task in waiters: waiters.remove(raw_task) break for condition, waiters in self.condition_waiters.items(): if raw_task in waiters: waiters.discard(raw_task) break
[docs] def cancel(self, raw_task: RawTask[Command, Any, Any]) -> bool: """ Cancel a task. Args: raw_task: The task to cancel. Returns: True if the task was successfully cancelled; False if it was already finished or errored. """ if raw_task in self.finished or raw_task in self.exceptions: return False # Check if cancellation is already pending if raw_task in self.pending_cancellation: return True # Mark cancellation as pending self.pending_cancellation.add(raw_task) # Remove the task from all wait states so it can process cancellation immediately self._remove_task_from_wait_states(raw_task) # Inject the cancellation exception self.tasks.append((raw_task, None, TaskCancelled(TaskHandle(self, raw_task)))) return True
@overload def run_until_complete( self, root_task: RawTask[Command, Any, _ReturnT], join: Literal[False] = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None, ) -> None: ... @overload def run_until_complete( self, root_task: RawTask[Command, Any, _ReturnT], join: bool = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None, ) -> _ReturnT: ...
[docs] def run_until_complete( self, root_task: RawTask[Command, Any, _ReturnT], join: bool = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None, ) -> _ReturnT | None: """ Run the event loop until the given root task is complete. This method executes the main event loop, processing tasks, handling I/O operations, and managing task synchronization until the root task completes. It can optionally wait for all spawned tasks to finish as well. Args: root_task (RawTask[Command, Any, _ReturnT]): The coroutine task to execute as the root of the execution graph. join (bool): When True, returns the result value of the root task. When False, returns None regardless of the task's result. If the task raises an exception and join=True, the exception is re-raised. wait_for_spawned_tasks (bool): When True, continue running the event loop until all tasks spawned by the root task have completed. When False, stop as soon as the root task completes. _debug_max_wait_time (float | None): Optional timeout value in seconds used for debugging. Limits how long the event loop will wait for network or sleeping operations. Returns: _ReturnT | None: If join=True, returns the result of the root task (of type _ReturnT). If join=False, returns None. Raises: RuntimeError: If the event loop exits without completing the root task when join=True. Exception: Any exception raised by the root task is propagated if join=True. """ self._install_signal_handlers() try: return_value = self._run_event_loop_core( root_task, join=join, wait_for_spawned_tasks=wait_for_spawned_tasks, _debug_max_wait_time=_debug_max_wait_time, ) self._uninstall_signal_handlers() except: self._uninstall_signal_handlers() raise return return_value
def _run_event_loop_core( self, root_task: RawTask[Command, Any, _ReturnT], join: bool = False, wait_for_spawned_tasks: bool = True, _debug_max_wait_time: float | None = None, ): global _current_event_loop _current_event_loop = self try: self._debug_max_wait_time = _debug_max_wait_time self._loop_thread = threading.current_thread() self._exit_requested = (False, None, None) # Reset exit flag self.tasks.append((root_task, None, None)) # Register wakeup socket with selector # Blank metadata for wakeup socket. Never used. metadata = InstrumentationMetadata( _task=None, _command=None, socket_handle=None ) sel.register(self._wakeup_reader, selectors.EVENT_READ, metadata) while self.has_living_tasks(): # Check if exit was requested exit_requested, exit_value, exit_exception = self._exit_requested if exit_requested: # Handle any requested exit if exit_exception is not None: raise exit_exception elif join: return cast(_ReturnT, exit_value) else: return None # Determine the timeout for selector based on tasks and sleeping tasks. if self.tasks: timeout = 0 elif self.sleeping: timeout = self.sleeping[0][0] - timer() if ( self._debug_max_wait_time is not None and timeout > self._debug_max_wait_time ): logger.error( f"Sleeping task timeout {timeout} exceeds max wait time {_debug_max_wait_time}." ) timeout = self._debug_max_wait_time else: timeout = self._debug_max_wait_time for key, _mask in sel.select(timeout): data = cast(InstrumentationMetadata, key.data) if key.fileobj == self._wakeup_reader: # Clear wakeup signal try: self._wakeup_reader.recv(1024) except (BlockingIOError, socket.error): pass continue # Skip further processing for wakeup socket # Handle regular socket commands if data._command is None: # Skip processing if no command (shouldn't happen for regular sockets) continue match data._command: case SocketAcceptCommand(): get_current_instrument().on_socket_accept_ready( ReadySocketInstrumentationMetadata.from_instrumentation_metadata( data ) ) case SocketRecvCommand(): get_current_instrument().on_socket_recv_ready( ReadySocketInstrumentationMetadata.from_instrumentation_metadata( data ) ) case SocketSendCommand(): get_current_instrument().on_socket_send_ready( ReadySocketInstrumentationMetadata.from_instrumentation_metadata( data ) ) case _: raise ValueError( f"Unknown selector command data type: {type(data._command)}" ) self.tasks.append((data._task, None, None)) _ = sel.unregister(key.fileobj) self.waiting_on_network.remove(data._task) while self.sleeping and self.sleeping[0][0] <= timer(): _, task = heapq.heappop(self.sleeping) self.tasks.append((task, None, None)) if self.tasks: task_packet = self.tasks.popleft() # Set the current task before executing token = _current_task.set(task_packet[0]) try: if task_packet[2] is not None: self._on_task_before_throw(task_packet[0], task_packet[2]) command = task_packet[0].throw(task_packet[2]) self._on_task_after_throw(task_packet[0], task_packet[2], command) else: self._on_task_before_send(task_packet[0], task_packet[1]) command = task_packet[0].send(task_packet[1]) self._on_task_after_send(task_packet[0], task_packet[1], command) except StopIteration as e: returned_value = cast(object, e.value) self.finished[task_packet[0]] = returned_value self._on_task_completed(task_packet[0], returned_value) for watcher in self.watching_task[task_packet[0]]: self.tasks.append((watcher, returned_value, None)) del self.watching_task[task_packet[0]] # Clean up cancel waiters and pending cancellation tracking if task_packet[0] in self.cancel_waiters: del self.cancel_waiters[task_packet[0]] self.pending_cancellation.discard(task_packet[0]) if task_packet[0] == root_task and not wait_for_spawned_tasks: return cast(_ReturnT, returned_value) if join else None except TaskCancelled as e: self.cancelled.add(task_packet[0]) self.exceptions[task_packet[0]] = e self._on_task_cancelled(task_packet[0], e) # Resume watchers - cancel waiters get None, join waiters get the exception for watcher in self.watching_task[task_packet[0]]: if ( task_packet[0] in self.cancel_waiters and watcher in self.cancel_waiters[task_packet[0]] ): # This is a cancel waiter - successful cancellation returns None self.tasks.append((watcher, None, None)) else: # This is a join waiter - propagate the exception self.tasks.append((watcher, None, e)) del self.watching_task[task_packet[0]] # Clean up cancel waiters and pending cancellation tracking if task_packet[0] in self.cancel_waiters: del self.cancel_waiters[task_packet[0]] self.pending_cancellation.discard(task_packet[0]) if task_packet[0] == root_task and not wait_for_spawned_tasks: if join: raise e else: return except Exception as e: logger.exception(f"Task {task_packet[0]} raised an exception: {e}") self.exceptions[task_packet[0]] = e self._on_task_error(task_packet[0], e) for watcher in self.watching_task[task_packet[0]]: self.tasks.append((watcher, None, e)) del self.watching_task[task_packet[0]] # Clean up cancel waiters and pending cancellation tracking if task_packet[0] in self.cancel_waiters: del self.cancel_waiters[task_packet[0]] self.pending_cancellation.discard(task_packet[0]) if task_packet[0] == root_task and not wait_for_spawned_tasks: if join: raise e else: return else: _ = self._handle_command(task_packet, command) finally: # Reset the context after task execution _current_task.reset(token) if join and root_task in self.finished: return cast(_ReturnT, self.finished[root_task]) elif join and root_task in self.exceptions: raise self.exceptions[root_task] elif join: raise RuntimeError("Event loop exited without completing the root task.") finally: _current_event_loop = None