"""
Event Loop Instrumentation System.
This module provides a flexible instrumentation system for monitoring and debugging
the Flowno event loop. It captures metrics and events for socket operations, queue
operations, and other event loop activities.
The instrumentation system uses a context manager pattern, allowing multiple
instrumentation hooks to be applied within different scopes of code.
Example:
>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.instrumentation import PrintInstrument
>>> from flowno.core.event_loop.queues import AsyncQueue
>>>
>>> # Define a simple coroutine that uses a queue
>>> async def queue_example():
... queue = AsyncQueue()
... # Put an item into the queue
... await queue.put(42)
... # Get the item back from the queue
... value = await queue.get()
... return value
>>>
>>> # Create an event loop and run with instrumentation
>>> event_loop = EventLoop()
>>> with PrintInstrument():
... result = event_loop.run_until_complete(queue_example(), join=True)
...
[QUEUE-PUT] Item put in queue: 42
[QUEUE-GET] Item retrieved from queue: 42
>>> print(result)
42
"""
from __future__ import annotations
from contextvars import ContextVar, Token
from dataclasses import dataclass, field
from time import time
from types import TracebackType
from typing import TYPE_CHECKING, Any, Final, TypeVar
from typing_extensions import Self, override
if TYPE_CHECKING:
from flowno.core.event_loop.commands import Command
from flowno.core.event_loop.queues import AsyncQueue
from flowno.core.event_loop.selectors import SocketHandle
from flowno.core.event_loop.types import RawTask, _Address
import logging
logger = logging.getLogger(__name__)
T = TypeVar("T")
_instrument_stack: ContextVar[list["EventLoopInstrument"] | None] = ContextVar("_instrument_stack", default=None)
[docs]
class EventLoopInstrument:
"""
Base class for event loop instrumentation.
This class provides hooks for various event loop operations. Subclasses can
override these methods to implement custom monitoring or logging.
Instruments can be nested using context managers. When nested, all active
instruments receive events, with inner instruments firing before outer ones.
"""
_token: Token[list["EventLoopInstrument"]] | None = None
[docs]
def on_queue_get(self, queue: "AsyncQueue[T]", item: T, immediate: bool) -> None:
"""
Called every time a queue completes a get command.
If the task blocks on an empty queue, this callback fires when an items
is available and returned.
Args:
queue (AsyncQueue[T]): The queue which the item was popped.
item (T): The item that was popped from the queue.
"""
pass
[docs]
def on_queue_put(self, queue: "AsyncQueue[T]", item: T, immediate: bool) -> None:
"""
Called every time a queue processes a put command.
If the task blocks on a full queue, this callback fires when the queue
accepts the item.
Args:
queue (AsyncQueue[T]): The queue which the item was added to.
item (T): The item that was added to the queue.
"""
pass
[docs]
def on_socket_connect_start(self, metadata: SocketConnectStartMetadata) -> None:
"""
Called when a socket connection is initiated.
Args:
metadata: Connection metadata including target address
"""
pass
[docs]
def on_socket_connect_ready(self, metadata: SocketConnectReadyMetadata) -> None:
"""
Called when a socket connection has been established.
Args:
metadata: Connection metadata including duration and target address
"""
pass
[docs]
def on_socket_recv_start(self, metadata: InstrumentationMetadata) -> None:
"""
Called when a task starts waiting on a socket recv.
Args:
metadata: Socket operation metadata
"""
pass
[docs]
def on_socket_recv_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None:
"""
Called when the socket is ready for reading.
Args:
metadata: Socket operation metadata including duration
"""
pass
[docs]
def on_socket_recv_data(self, metadata: SocketRecvDataMetadata) -> None:
"""
Called immediately after the actual bytes have been read.
Args:
metadata: Metadata including the received bytes
"""
pass
[docs]
def on_socket_send_data(self, metadata: "SocketSendDataMetadata") -> None:
"""
Called immediately after bytes have been sent to the kernel.
Args:
metadata: Metadata including the sent bytes and actual bytes_sent count
"""
pass
[docs]
def on_tls_handshake_complete(self, metadata: "TLSHandshakeMetadata") -> None:
"""
Called after a TLS handshake completes successfully.
Args:
metadata: Metadata including cipher info and TLS version
"""
pass
[docs]
def on_socket_send_start(self, metadata: InstrumentationMetadata) -> None:
"""
Called when a task starts waiting on a socket send.
Args:
metadata: Socket operation metadata
"""
pass
[docs]
def on_socket_send_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None:
"""
Called when the socket send operation completes.
Args:
metadata: Socket operation metadata including duration
"""
pass
[docs]
def on_socket_accept_start(self, metadata: InstrumentationMetadata) -> None:
"""
Called when a task starts waiting on a socket accept.
Args:
metadata: Socket operation metadata
"""
pass
[docs]
def on_socket_accept_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None:
"""
Called when a socket accept operation completes.
Args:
metadata: Socket operation metadata including duration
"""
pass
[docs]
def on_socket_close(self, metadata: InstrumentationMetadata) -> None:
"""
Called when a socket connection is closed.
Args:
metadata: Socket operation metadata
"""
pass
[docs]
def on_task_before_send(self, metadata: TaskSendMetadata) -> None:
"""
Called before a task's send() method is invoked.
Args:
metadata: Task send metadata including the task and value being sent
"""
pass
[docs]
def on_task_after_send(self, metadata: TaskSendMetadata, command: "Command") -> None:
"""
Called after a task's send() method completes successfully.
Args:
metadata: Task send metadata including the task and value that was sent
command: The command yielded by the task
"""
pass
[docs]
def on_task_before_throw(self, metadata: TaskThrowMetadata) -> None:
"""
Called before a task's throw() method is invoked.
Args:
metadata: Task throw metadata including the task and exception being thrown
"""
pass
[docs]
def on_task_after_throw(self, metadata: TaskThrowMetadata, command: "Command") -> None:
"""
Called after a task's throw() method completes successfully.
Args:
metadata: Task throw metadata including the task and exception that was thrown
command: The command yielded by the task
"""
pass
[docs]
def on_task_completed(self, task: "RawTask[Command, Any, Any]", result: Any) -> None:
"""
Called when a task completes successfully (StopIteration).
Args:
task: The task that completed
result: The return value of the task
"""
pass
[docs]
def on_task_error(self, task: "RawTask[Command, Any, Any]", exception: Exception) -> None:
"""
Called when a task raises an exception.
Args:
task: The task that raised the exception
exception: The exception that was raised
"""
pass
[docs]
def on_task_cancelled(self, task: "RawTask[Command, Any, Any]", exception: Exception) -> None:
"""
Called when a task is cancelled (TaskCancelled exception).
Args:
task: The task that was cancelled
exception: The TaskCancelled exception
"""
pass
def __enter__(self: Self) -> Self:
stack = _instrument_stack.get() or []
self._token = _instrument_stack.set(stack + [self])
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
if self._token is not None:
_instrument_stack.reset(self._token)
return False
[docs]
class PrintInstrument(EventLoopInstrument):
"""
Event loop instrument that prints information to stdout.
This instrument outputs detailed information about socket operations.
"""
print = print
def _get_peer_info(self, sock_handle: SocketHandle) -> tuple[str, int]:
try:
return sock_handle.socket.getpeername()
except (OSError, AttributeError):
raise
[docs]
@override
def on_socket_connect_start(self, metadata: SocketConnectStartMetadata) -> None:
peer = metadata.target_address
self.print(f"[CONNECT] Connecting to {peer}.")
[docs]
@override
def on_socket_connect_ready(self, metadata: SocketConnectReadyMetadata) -> None:
duration = metadata.finish_time - metadata.start_time
peer = self._get_peer_info(metadata.socket_handle)
self.print(
f"[CONNECT] Connected to {peer} in {duration:.4f}s via {metadata.socket_handle.socket.getsockname()}"
)
[docs]
@override
def on_socket_send_start(self, metadata: InstrumentationMetadata) -> None:
peer = self._get_peer_info(metadata.socket_handle)
self.print(f"[SEND] Starting send to {peer}")
[docs]
@override
def on_socket_send_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None:
duration = metadata.finish_time - metadata.start_time
peer = self._get_peer_info(metadata.socket_handle)
self.print(f"[SEND] Completed send to {peer} in {duration:.4f}s")
[docs]
@override
def on_socket_recv_start(self, metadata: InstrumentationMetadata) -> None:
peer = self._get_peer_info(metadata.socket_handle)
self.print(f"[RECV] Waiting for data from {peer}")
[docs]
@override
def on_socket_recv_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None:
duration = metadata.finish_time - metadata.start_time
peer = self._get_peer_info(metadata.socket_handle)
self.print(f"[RECV] Ready to receive data from {peer} in {duration:.4f}s")
[docs]
@override
def on_socket_close(self, metadata: InstrumentationMetadata) -> None:
peer = self._get_peer_info(metadata.socket_handle)
self.print(f"[CLOSE] Connection closed to {peer}")
[docs]
@override
def on_socket_recv_data(self, metadata: SocketRecvDataMetadata) -> None:
chunk_len = len(metadata.data)
# Maybe show the first 50 bytes if it's textual data:
chunk_preview = metadata.data[:50].decode("utf-8", errors="replace")
self.print(f"[RECV-DATA] Got {chunk_len} bytes: {chunk_preview!r}")
self.print(f"[RECV-DATA] FULL DATA:\n{metadata.data.decode()!r}")
[docs]
@override
def on_socket_send_data(self, metadata: SocketSendDataMetadata) -> None:
data_len = len(metadata.data)
bytes_sent = metadata.bytes_sent
# Show first 50 bytes if textual
data_preview = metadata.data[:50].decode("utf-8", errors="replace")
self.print(f"[SEND-DATA] Sent {bytes_sent}/{data_len} bytes: {data_preview!r}")
[docs]
@override
def on_tls_handshake_complete(self, metadata: TLSHandshakeMetadata) -> None:
duration = metadata.finish_time - metadata.start_time
cipher_info = metadata.cipher[0] if metadata.cipher else "unknown"
self.print(
f"[TLS] Handshake complete in {duration:.4f}s: {metadata.version}, {cipher_info}"
)
[docs]
class LogInstrument(PrintInstrument):
"""
Event loop instrument that logs information using the logging module.
This instrument uses the debug log level for all messages.
"""
print = logger.debug
[docs]
class _CompositeInstrument(EventLoopInstrument):
"""
Internal class that dispatches events to multiple instruments.
When multiple instruments are active (nested context managers), this class
wraps them and dispatches each event to all instruments in reverse order
(inner-most first, then outer).
"""
def __init__(self, instruments: list[EventLoopInstrument]):
self._instruments = instruments
[docs]
@override
def on_queue_get(self, queue: "AsyncQueue[T]", item: T, immediate: bool) -> None:
for instrument in reversed(self._instruments):
instrument.on_queue_get(queue, item, immediate)
[docs]
@override
def on_queue_put(self, queue: "AsyncQueue[T]", item: T, immediate: bool) -> None:
for instrument in reversed(self._instruments):
instrument.on_queue_put(queue, item, immediate)
[docs]
@override
def on_socket_connect_start(self, metadata: SocketConnectStartMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_socket_connect_start(metadata)
[docs]
@override
def on_socket_connect_ready(self, metadata: SocketConnectReadyMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_socket_connect_ready(metadata)
[docs]
@override
def on_socket_recv_start(self, metadata: InstrumentationMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_socket_recv_start(metadata)
[docs]
@override
def on_socket_recv_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_socket_recv_ready(metadata)
[docs]
@override
def on_socket_recv_data(self, metadata: SocketRecvDataMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_socket_recv_data(metadata)
[docs]
@override
def on_socket_send_data(self, metadata: SocketSendDataMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_socket_send_data(metadata)
[docs]
@override
def on_tls_handshake_complete(self, metadata: TLSHandshakeMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_tls_handshake_complete(metadata)
[docs]
@override
def on_socket_send_start(self, metadata: InstrumentationMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_socket_send_start(metadata)
[docs]
@override
def on_socket_send_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_socket_send_ready(metadata)
[docs]
@override
def on_socket_accept_start(self, metadata: InstrumentationMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_socket_accept_start(metadata)
[docs]
@override
def on_socket_accept_ready(self, metadata: ReadySocketInstrumentationMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_socket_accept_ready(metadata)
[docs]
@override
def on_socket_close(self, metadata: InstrumentationMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_socket_close(metadata)
[docs]
@override
def on_task_before_send(self, metadata: TaskSendMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_task_before_send(metadata)
[docs]
@override
def on_task_after_send(self, metadata: TaskSendMetadata, command: "Command") -> None:
for instrument in reversed(self._instruments):
instrument.on_task_after_send(metadata, command)
[docs]
@override
def on_task_before_throw(self, metadata: TaskThrowMetadata) -> None:
for instrument in reversed(self._instruments):
instrument.on_task_before_throw(metadata)
[docs]
@override
def on_task_after_throw(self, metadata: TaskThrowMetadata, command: "Command") -> None:
for instrument in reversed(self._instruments):
instrument.on_task_after_throw(metadata, command)
[docs]
@override
def on_task_completed(self, task: "RawTask[Command, Any, Any]", result: Any) -> None:
for instrument in reversed(self._instruments):
instrument.on_task_completed(task, result)
[docs]
@override
def on_task_error(self, task: "RawTask[Command, Any, Any]", exception: Exception) -> None:
for instrument in reversed(self._instruments):
instrument.on_task_error(task, exception)
[docs]
@override
def on_task_cancelled(self, task: "RawTask[Command, Any, Any]", exception: Exception) -> None:
for instrument in reversed(self._instruments):
instrument.on_task_cancelled(task, exception)
EMPTY_INSTRUMENT: Final[EventLoopInstrument] = EventLoopInstrument()
[docs]
def get_current_instrument() -> EventLoopInstrument:
"""
Get the current instrumentation context.
When multiple instruments are active (nested context managers), returns a
composite that dispatches to all of them (inner-most first).
Returns:
The currently active instrument(s) or an empty instrument if none is active.
"""
stack = _instrument_stack.get()
if not stack:
return EMPTY_INSTRUMENT
if len(stack) == 1:
return stack[0]
return _CompositeInstrument(stack)
__all__ = [
"EventLoopInstrument",
"PrintInstrument",
"LogInstrument",
"InstrumentationMetadata",
"ReadySocketInstrumentationMetadata",
"SocketRecvDataMetadata",
"SocketSendDataMetadata",
"SocketConnectStartMetadata",
"SocketConnectReadyMetadata",
"TLSHandshakeMetadata",
"TaskSendMetadata",
"TaskThrowMetadata",
"get_current_instrument",
]