Source code for flowno.core.event_loop.instrumentation

"""
Event Loop Instrumentation System.

This module provides a flexible instrumentation system for monitoring and debugging
the Flowno event loop. It captures metrics and events for socket operations, queue
operations, and other event loop activities.

The instrumentation system uses a context manager pattern, allowing multiple
instrumentation hooks to be applied within different scopes of code.

Example:
    >>> from flowno.core.event_loop.event_loop import EventLoop
    >>> from flowno.core.event_loop.instrumentation import PrintInstrument
    >>> from flowno.core.event_loop.queues import AsyncQueue
    >>> 
    >>> # Define a simple coroutine that uses a queue
    >>> async def queue_example():
    ...     queue = AsyncQueue()
    ...     # Put an item into the queue
    ...     await queue.put(42)
    ...     # Get the item back from the queue
    ...     value = await queue.get()
    ...     return value
    >>> 
    >>> # Create an event loop and run with instrumentation
    >>> event_loop = EventLoop()
    >>> with PrintInstrument():
    ...     result = event_loop.run_until_complete(queue_example(), join=True)
    ...
    [QUEUE-PUT] Item put in queue: 42
    [QUEUE-GET] Item retrieved from queue: 42
    >>> print(result)
    42
"""

from __future__ import annotations

from contextvars import ContextVar, Token
from dataclasses import dataclass, field
from time import time
from types import TracebackType
from typing import TYPE_CHECKING, Any, Final, TypeVar

from typing_extensions import Self, override

if TYPE_CHECKING:
    from flowno.core.event_loop.commands import Command
    from flowno.core.event_loop.queues import AsyncQueue
    from flowno.core.event_loop.selectors import SocketHandle
    from flowno.core.event_loop.types import RawTask, _Address

import logging

logger = logging.getLogger(__name__)

T = TypeVar("T")

_instrument_stack: ContextVar[list["EventLoopInstrument"] | None] = ContextVar("_instrument_stack", default=None)


[docs] @dataclass class InstrumentationMetadata: """Base metadata class for instrumentation events.""" _task: "RawTask[Any, None, None]" _command: "Command" socket_handle: "SocketHandle" immediate: bool = False start_time: float = field(default_factory=time)
[docs] @dataclass class ReadySocketInstrumentationMetadata(InstrumentationMetadata): """Metadata for socket operations that have completed.""" finish_time: float = field(default_factory=time) @classmethod def from_instrumentation_metadata( cls, metadata: InstrumentationMetadata, **kwargs: Any ) -> "ReadySocketInstrumentationMetadata": return cls( _task=metadata._task, _command=metadata._command, socket_handle=metadata.socket_handle, immediate=metadata.immediate, start_time=metadata.start_time, **kwargs, )
[docs] @dataclass class SocketRecvDataMetadata: """Metadata for received socket data.""" socket_handle: "SocketHandle" data: bytes
[docs] @dataclass class SocketSendDataMetadata: """Metadata for sent socket data.""" socket_handle: "SocketHandle" data: bytes bytes_sent: int # Actual bytes accepted by kernel (may differ from len(data))
[docs] @dataclass class SocketConnectStartMetadata: """Metadata for socket connection initiation.""" socket_handle: "SocketHandle" target_address: "_Address" immediate: bool = False start_time: float = field(default_factory=time)
[docs] @dataclass class SocketConnectReadyMetadata(SocketConnectStartMetadata): """Metadata for completed socket connections.""" finish_time: float = field(default_factory=time) @classmethod def from_instrumentation_metadata( cls, metadata: SocketConnectStartMetadata, **kwargs: Any ) -> "SocketConnectReadyMetadata": return cls( socket_handle=metadata.socket_handle, target_address=metadata.target_address, immediate=metadata.immediate, start_time=metadata.start_time, **kwargs, )
[docs] @dataclass class TLSHandshakeMetadata: """Metadata for completed TLS handshakes.""" socket_handle: "SocketHandle" cipher: tuple[str, str, int] | None # (cipher_name, protocol, bits) version: str | None # e.g., "TLSv1.3" server_hostname: str | None start_time: float finish_time: float = field(default_factory=time)
[docs] @dataclass class TaskSendMetadata: """Metadata for task send operations.""" task: "RawTask[Command, Any, Any]" send_value: Any
[docs] @dataclass class TaskThrowMetadata: """Metadata for task throw operations.""" task: "RawTask[Command, Any, Any]" exception: Exception
[docs] class EventLoopInstrument: """ Base class for event loop instrumentation. This class provides hooks for various event loop operations. Subclasses can override these methods to implement custom monitoring or logging. Instruments can be nested using context managers. When nested, all active instruments receive events, with inner instruments firing before outer ones. """ _token: Token[list["EventLoopInstrument"]] | None = None
[docs] def on_queue_get(self, queue: "AsyncQueue[T]", item: T, immediate: bool) -> None: """ Called every time a queue completes a get command. If the task blocks on an empty queue, this callback fires when an items is available and returned. Args: queue (AsyncQueue[T]): The queue which the item was popped. item (T): The item that was popped from the queue. """ pass
[docs] def on_queue_put(self, queue: "AsyncQueue[T]", item: T, immediate: bool) -> None: """ Called every time a queue processes a put command. If the task blocks on a full queue, this callback fires when the queue accepts the item. Args: queue (AsyncQueue[T]): The queue which the item was added to. item (T): The item that was added to the queue. """ pass
[docs] def on_socket_connect_start(self, metadata: SocketConnectStartMetadata) -> None: """ Called when a socket connection is initiated. Args: metadata: Connection metadata including target address """ pass
[docs] def on_socket_connect_ready(self, metadata: SocketConnectReadyMetadata) -> None: """ Called when a socket connection has been established. Args: metadata: Connection metadata including duration and target address """ pass
[docs] def on_socket_recv_start(self, metadata: InstrumentationMetadata) -> None: """ Called when a task starts waiting on a socket recv. Args: metadata: Socket operation metadata """ pass
[docs] def on_socket_recv_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None: """ Called when the socket is ready for reading. Args: metadata: Socket operation metadata including duration """ pass
[docs] def on_socket_recv_data(self, metadata: SocketRecvDataMetadata) -> None: """ Called immediately after the actual bytes have been read. Args: metadata: Metadata including the received bytes """ pass
[docs] def on_socket_send_data(self, metadata: "SocketSendDataMetadata") -> None: """ Called immediately after bytes have been sent to the kernel. Args: metadata: Metadata including the sent bytes and actual bytes_sent count """ pass
[docs] def on_tls_handshake_complete(self, metadata: "TLSHandshakeMetadata") -> None: """ Called after a TLS handshake completes successfully. Args: metadata: Metadata including cipher info and TLS version """ pass
[docs] def on_socket_send_start(self, metadata: InstrumentationMetadata) -> None: """ Called when a task starts waiting on a socket send. Args: metadata: Socket operation metadata """ pass
[docs] def on_socket_send_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None: """ Called when the socket send operation completes. Args: metadata: Socket operation metadata including duration """ pass
[docs] def on_socket_accept_start(self, metadata: InstrumentationMetadata) -> None: """ Called when a task starts waiting on a socket accept. Args: metadata: Socket operation metadata """ pass
[docs] def on_socket_accept_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None: """ Called when a socket accept operation completes. Args: metadata: Socket operation metadata including duration """ pass
[docs] def on_socket_close(self, metadata: InstrumentationMetadata) -> None: """ Called when a socket connection is closed. Args: metadata: Socket operation metadata """ pass
[docs] def on_task_before_send(self, metadata: TaskSendMetadata) -> None: """ Called before a task's send() method is invoked. Args: metadata: Task send metadata including the task and value being sent """ pass
[docs] def on_task_after_send(self, metadata: TaskSendMetadata, command: "Command") -> None: """ Called after a task's send() method completes successfully. Args: metadata: Task send metadata including the task and value that was sent command: The command yielded by the task """ pass
[docs] def on_task_before_throw(self, metadata: TaskThrowMetadata) -> None: """ Called before a task's throw() method is invoked. Args: metadata: Task throw metadata including the task and exception being thrown """ pass
[docs] def on_task_after_throw(self, metadata: TaskThrowMetadata, command: "Command") -> None: """ Called after a task's throw() method completes successfully. Args: metadata: Task throw metadata including the task and exception that was thrown command: The command yielded by the task """ pass
[docs] def on_task_completed(self, task: "RawTask[Command, Any, Any]", result: Any) -> None: """ Called when a task completes successfully (StopIteration). Args: task: The task that completed result: The return value of the task """ pass
[docs] def on_task_error(self, task: "RawTask[Command, Any, Any]", exception: Exception) -> None: """ Called when a task raises an exception. Args: task: The task that raised the exception exception: The exception that was raised """ pass
[docs] def on_task_cancelled(self, task: "RawTask[Command, Any, Any]", exception: Exception) -> None: """ Called when a task is cancelled (TaskCancelled exception). Args: task: The task that was cancelled exception: The TaskCancelled exception """ pass
def __enter__(self: Self) -> Self: stack = _instrument_stack.get() or [] self._token = _instrument_stack.set(stack + [self]) return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> bool: if self._token is not None: _instrument_stack.reset(self._token) return False
[docs] class PrintInstrument(EventLoopInstrument): """ Event loop instrument that prints information to stdout. This instrument outputs detailed information about socket operations. """ print = print def _get_peer_info(self, sock_handle: SocketHandle) -> tuple[str, int]: try: return sock_handle.socket.getpeername() except (OSError, AttributeError): raise
[docs] @override def on_socket_connect_start(self, metadata: SocketConnectStartMetadata) -> None: peer = metadata.target_address self.print(f"[CONNECT] Connecting to {peer}.")
[docs] @override def on_socket_connect_ready(self, metadata: SocketConnectReadyMetadata) -> None: duration = metadata.finish_time - metadata.start_time peer = self._get_peer_info(metadata.socket_handle) self.print( f"[CONNECT] Connected to {peer} in {duration:.4f}s via {metadata.socket_handle.socket.getsockname()}" )
[docs] @override def on_socket_send_start(self, metadata: InstrumentationMetadata) -> None: peer = self._get_peer_info(metadata.socket_handle) self.print(f"[SEND] Starting send to {peer}")
[docs] @override def on_socket_send_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None: duration = metadata.finish_time - metadata.start_time peer = self._get_peer_info(metadata.socket_handle) self.print(f"[SEND] Completed send to {peer} in {duration:.4f}s")
[docs] @override def on_socket_recv_start(self, metadata: InstrumentationMetadata) -> None: peer = self._get_peer_info(metadata.socket_handle) self.print(f"[RECV] Waiting for data from {peer}")
[docs] @override def on_socket_recv_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None: duration = metadata.finish_time - metadata.start_time peer = self._get_peer_info(metadata.socket_handle) self.print(f"[RECV] Ready to receive data from {peer} in {duration:.4f}s")
[docs] @override def on_socket_close(self, metadata: InstrumentationMetadata) -> None: peer = self._get_peer_info(metadata.socket_handle) self.print(f"[CLOSE] Connection closed to {peer}")
[docs] @override def on_socket_recv_data(self, metadata: SocketRecvDataMetadata) -> None: chunk_len = len(metadata.data) # Maybe show the first 50 bytes if it's textual data: chunk_preview = metadata.data[:50].decode("utf-8", errors="replace") self.print(f"[RECV-DATA] Got {chunk_len} bytes: {chunk_preview!r}") self.print(f"[RECV-DATA] FULL DATA:\n{metadata.data.decode()!r}")
[docs] @override def on_socket_send_data(self, metadata: SocketSendDataMetadata) -> None: data_len = len(metadata.data) bytes_sent = metadata.bytes_sent # Show first 50 bytes if textual data_preview = metadata.data[:50].decode("utf-8", errors="replace") self.print(f"[SEND-DATA] Sent {bytes_sent}/{data_len} bytes: {data_preview!r}")
[docs] @override def on_tls_handshake_complete(self, metadata: TLSHandshakeMetadata) -> None: duration = metadata.finish_time - metadata.start_time cipher_info = metadata.cipher[0] if metadata.cipher else "unknown" self.print( f"[TLS] Handshake complete in {duration:.4f}s: {metadata.version}, {cipher_info}" )
[docs] class LogInstrument(PrintInstrument): """ Event loop instrument that logs information using the logging module. This instrument uses the debug log level for all messages. """ print = logger.debug
[docs] class _CompositeInstrument(EventLoopInstrument): """ Internal class that dispatches events to multiple instruments. When multiple instruments are active (nested context managers), this class wraps them and dispatches each event to all instruments in reverse order (inner-most first, then outer). """ def __init__(self, instruments: list[EventLoopInstrument]): self._instruments = instruments
[docs] @override def on_queue_get(self, queue: "AsyncQueue[T]", item: T, immediate: bool) -> None: for instrument in reversed(self._instruments): instrument.on_queue_get(queue, item, immediate)
[docs] @override def on_queue_put(self, queue: "AsyncQueue[T]", item: T, immediate: bool) -> None: for instrument in reversed(self._instruments): instrument.on_queue_put(queue, item, immediate)
[docs] @override def on_socket_connect_start(self, metadata: SocketConnectStartMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_socket_connect_start(metadata)
[docs] @override def on_socket_connect_ready(self, metadata: SocketConnectReadyMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_socket_connect_ready(metadata)
[docs] @override def on_socket_recv_start(self, metadata: InstrumentationMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_socket_recv_start(metadata)
[docs] @override def on_socket_recv_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_socket_recv_ready(metadata)
[docs] @override def on_socket_recv_data(self, metadata: SocketRecvDataMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_socket_recv_data(metadata)
[docs] @override def on_socket_send_data(self, metadata: SocketSendDataMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_socket_send_data(metadata)
[docs] @override def on_tls_handshake_complete(self, metadata: TLSHandshakeMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_tls_handshake_complete(metadata)
[docs] @override def on_socket_send_start(self, metadata: InstrumentationMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_socket_send_start(metadata)
[docs] @override def on_socket_send_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_socket_send_ready(metadata)
[docs] @override def on_socket_accept_start(self, metadata: InstrumentationMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_socket_accept_start(metadata)
[docs] @override def on_socket_accept_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_socket_accept_ready(metadata)
[docs] @override def on_socket_close(self, metadata: InstrumentationMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_socket_close(metadata)
[docs] @override def on_task_before_send(self, metadata: TaskSendMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_task_before_send(metadata)
[docs] @override def on_task_after_send(self, metadata: TaskSendMetadata, command: "Command") -> None: for instrument in reversed(self._instruments): instrument.on_task_after_send(metadata, command)
[docs] @override def on_task_before_throw(self, metadata: TaskThrowMetadata) -> None: for instrument in reversed(self._instruments): instrument.on_task_before_throw(metadata)
[docs] @override def on_task_after_throw(self, metadata: TaskThrowMetadata, command: "Command") -> None: for instrument in reversed(self._instruments): instrument.on_task_after_throw(metadata, command)
[docs] @override def on_task_completed(self, task: "RawTask[Command, Any, Any]", result: Any) -> None: for instrument in reversed(self._instruments): instrument.on_task_completed(task, result)
[docs] @override def on_task_error(self, task: "RawTask[Command, Any, Any]", exception: Exception) -> None: for instrument in reversed(self._instruments): instrument.on_task_error(task, exception)
[docs] @override def on_task_cancelled(self, task: "RawTask[Command, Any, Any]", exception: Exception) -> None: for instrument in reversed(self._instruments): instrument.on_task_cancelled(task, exception)
EMPTY_INSTRUMENT: Final[EventLoopInstrument] = EventLoopInstrument()
[docs] def get_current_instrument() -> EventLoopInstrument: """ Get the current instrumentation context. When multiple instruments are active (nested context managers), returns a composite that dispatches to all of them (inner-most first). Returns: The currently active instrument(s) or an empty instrument if none is active. """ stack = _instrument_stack.get() if not stack: return EMPTY_INSTRUMENT if len(stack) == 1: return stack[0] return _CompositeInstrument(stack)
__all__ = [ "EventLoopInstrument", "PrintInstrument", "LogInstrument", "InstrumentationMetadata", "ReadySocketInstrumentationMetadata", "SocketRecvDataMetadata", "SocketSendDataMetadata", "SocketConnectStartMetadata", "SocketConnectReadyMetadata", "TLSHandshakeMetadata", "TaskSendMetadata", "TaskThrowMetadata", "get_current_instrument", ]