flowno.core.event_loop.instrumentation
Event Loop Instrumentation System.
This module provides a flexible instrumentation system for monitoring and debugging the Flowno event loop. It captures metrics and events for socket operations, queue operations, and other event loop activities.
The instrumentation system uses a context manager pattern, allowing multiple instrumentation hooks to be applied within different scopes of code.
Example
>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.instrumentation import PrintInstrument
>>> from flowno.core.event_loop.queues import AsyncQueue
>>>
>>> # Define a simple coroutine that uses a queue
>>> async def queue_example():
... queue = AsyncQueue()
... # Put an item into the queue
... await queue.put(42)
... # Get the item back from the queue
... value = await queue.get()
... return value
>>>
>>> # Create an event loop and run with instrumentation
>>> event_loop = EventLoop()
>>> with PrintInstrument():
... result = event_loop.run_until_complete(queue_example(), join=True)
...
[QUEUE-PUT] Item put in queue: 42
[QUEUE-GET] Item retrieved from queue: 42
>>> print(result)
42
- class flowno.core.event_loop.instrumentation.EventLoopInstrument[source]
Base class for event loop instrumentation.
This class provides hooks for various event loop operations. Subclasses can override these methods to implement custom monitoring or logging.
Instruments can be nested using context managers. When nested, all active instruments receive events, with inner instruments firing before outer ones.
- on_queue_get(queue: AsyncQueue[T], item: T, immediate: bool) None[source]
Called every time a queue completes a get command.
If the task blocks on an empty queue, this callback fires when an items is available and returned.
- Parameters:
queue (AsyncQueue[T]) – The queue which the item was popped.
item (T) – The item that was popped from the queue.
- on_queue_put(queue: AsyncQueue[T], item: T, immediate: bool) None[source]
Called every time a queue processes a put command.
If the task blocks on a full queue, this callback fires when the queue accepts the item.
- Parameters:
queue (AsyncQueue[T]) – The queue which the item was added to.
item (T) – The item that was added to the queue.
- on_socket_accept_ready(metadata: ReadySocketInstrumentationMetadata) None[source]
Called when a socket accept operation completes.
- Parameters:
metadata – Socket operation metadata including duration
- on_socket_accept_start(metadata: InstrumentationMetadata) None[source]
Called when a task starts waiting on a socket accept.
- Parameters:
metadata – Socket operation metadata
- on_socket_close(metadata: InstrumentationMetadata) None[source]
Called when a socket connection is closed.
- Parameters:
metadata – Socket operation metadata
- on_socket_connect_ready(metadata: SocketConnectReadyMetadata) None[source]
Called when a socket connection has been established.
- Parameters:
metadata – Connection metadata including duration and target address
- on_socket_connect_start(metadata: SocketConnectStartMetadata) None[source]
Called when a socket connection is initiated.
- Parameters:
metadata – Connection metadata including target address
- on_socket_recv_data(metadata: SocketRecvDataMetadata) None[source]
Called immediately after the actual bytes have been read.
- Parameters:
metadata – Metadata including the received bytes
- on_socket_recv_ready(metadata: ReadySocketInstrumentationMetadata) None[source]
Called when the socket is ready for reading.
- Parameters:
metadata – Socket operation metadata including duration
- on_socket_recv_start(metadata: InstrumentationMetadata) None[source]
Called when a task starts waiting on a socket recv.
- Parameters:
metadata – Socket operation metadata
- on_socket_send_data(metadata: SocketSendDataMetadata) None[source]
Called immediately after bytes have been sent to the kernel.
- Parameters:
metadata – Metadata including the sent bytes and actual bytes_sent count
- on_socket_send_ready(metadata: ReadySocketInstrumentationMetadata) None[source]
Called when the socket send operation completes.
- Parameters:
metadata – Socket operation metadata including duration
- on_socket_send_start(metadata: InstrumentationMetadata) None[source]
Called when a task starts waiting on a socket send.
- Parameters:
metadata – Socket operation metadata
- on_task_after_send(metadata: TaskSendMetadata, command: Command) None[source]
Called after a task’s send() method completes successfully.
- Parameters:
metadata – Task send metadata including the task and value that was sent
command – The command yielded by the task
- on_task_after_throw(metadata: TaskThrowMetadata, command: Command) None[source]
Called after a task’s throw() method completes successfully.
- Parameters:
metadata – Task throw metadata including the task and exception that was thrown
command – The command yielded by the task
- on_task_before_send(metadata: TaskSendMetadata) None[source]
Called before a task’s send() method is invoked.
- Parameters:
metadata – Task send metadata including the task and value being sent
- on_task_before_throw(metadata: TaskThrowMetadata) None[source]
Called before a task’s throw() method is invoked.
- Parameters:
metadata – Task throw metadata including the task and exception being thrown
- on_task_cancelled(task: RawTask[Command, Any, Any], exception: Exception) None[source]
Called when a task is cancelled (TaskCancelled exception).
- Parameters:
task – The task that was cancelled
exception – The TaskCancelled exception
- on_task_completed(task: RawTask[Command, Any, Any], result: Any) None[source]
Called when a task completes successfully (StopIteration).
- Parameters:
task – The task that completed
result – The return value of the task
- on_task_error(task: RawTask[Command, Any, Any], exception: Exception) None[source]
Called when a task raises an exception.
- Parameters:
task – The task that raised the exception
exception – The exception that was raised
- on_tls_handshake_complete(metadata: TLSHandshakeMetadata) None[source]
Called after a TLS handshake completes successfully.
- Parameters:
metadata – Metadata including cipher info and TLS version
- class flowno.core.event_loop.instrumentation.InstrumentationMetadata(_task: RawTask[Any, None, None], _command: Command, socket_handle: SocketHandle, immediate: bool = False, start_time: float = <factory>)[source]
Base metadata class for instrumentation events.
- class flowno.core.event_loop.instrumentation.LogInstrument[source]
Event loop instrument that logs information using the logging module.
This instrument uses the debug log level for all messages.
- print(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘DEBUG’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.debug(“Houston, we have a %s”, “thorny problem”, exc_info=True)
- class flowno.core.event_loop.instrumentation.PrintInstrument[source]
Event loop instrument that prints information to stdout.
This instrument outputs detailed information about socket operations.
- on_socket_close(metadata: InstrumentationMetadata) None[source]
Called when a socket connection is closed.
- Parameters:
metadata – Socket operation metadata
- on_socket_connect_ready(metadata: SocketConnectReadyMetadata) None[source]
Called when a socket connection has been established.
- Parameters:
metadata – Connection metadata including duration and target address
- on_socket_connect_start(metadata: SocketConnectStartMetadata) None[source]
Called when a socket connection is initiated.
- Parameters:
metadata – Connection metadata including target address
- on_socket_recv_data(metadata: SocketRecvDataMetadata) None[source]
Called immediately after the actual bytes have been read.
- Parameters:
metadata – Metadata including the received bytes
- on_socket_recv_ready(metadata: ReadySocketInstrumentationMetadata) None[source]
Called when the socket is ready for reading.
- Parameters:
metadata – Socket operation metadata including duration
- on_socket_recv_start(metadata: InstrumentationMetadata) None[source]
Called when a task starts waiting on a socket recv.
- Parameters:
metadata – Socket operation metadata
- on_socket_send_data(metadata: SocketSendDataMetadata) None[source]
Called immediately after bytes have been sent to the kernel.
- Parameters:
metadata – Metadata including the sent bytes and actual bytes_sent count
- on_socket_send_ready(metadata: ReadySocketInstrumentationMetadata) None[source]
Called when the socket send operation completes.
- Parameters:
metadata – Socket operation metadata including duration
- on_socket_send_start(metadata: InstrumentationMetadata) None[source]
Called when a task starts waiting on a socket send.
- Parameters:
metadata – Socket operation metadata
- on_tls_handshake_complete(metadata: TLSHandshakeMetadata) None[source]
Called after a TLS handshake completes successfully.
- Parameters:
metadata – Metadata including cipher info and TLS version
- print(*args, sep=' ', end='\n', file=None, flush=False)
Prints the values to a stream, or to sys.stdout by default.
- sep
string inserted between values, default a space.
- end
string appended after the last value, default a newline.
- file
a file-like object (stream); defaults to the current sys.stdout.
- flush
whether to forcibly flush the stream.
- class flowno.core.event_loop.instrumentation.ReadySocketInstrumentationMetadata(_task: RawTask[Any, None, None], _command: Command, socket_handle: SocketHandle, immediate: bool = False, start_time: float = <factory>, finish_time: float = <factory>)[source]
Metadata for socket operations that have completed.
- class flowno.core.event_loop.instrumentation.SocketConnectReadyMetadata(socket_handle: SocketHandle, target_address: _Address, immediate: bool = False, start_time: float = <factory>, finish_time: float = <factory>)[source]
Metadata for completed socket connections.
- class flowno.core.event_loop.instrumentation.SocketConnectStartMetadata(socket_handle: SocketHandle, target_address: _Address, immediate: bool = False, start_time: float = <factory>)[source]
Metadata for socket connection initiation.
- class flowno.core.event_loop.instrumentation.SocketRecvDataMetadata(socket_handle: SocketHandle, data: bytes)[source]
Metadata for received socket data.
- class flowno.core.event_loop.instrumentation.SocketSendDataMetadata(socket_handle: SocketHandle, data: bytes, bytes_sent: int)[source]
Metadata for sent socket data.
- class flowno.core.event_loop.instrumentation.TLSHandshakeMetadata(socket_handle: SocketHandle, cipher: tuple[str, str, int] | None, version: str | None, server_hostname: str | None, start_time: float, finish_time: float = <factory>)[source]
Metadata for completed TLS handshakes.
- class flowno.core.event_loop.instrumentation.TaskSendMetadata(task: RawTask[Command, Any, Any], send_value: Any)[source]
Metadata for task send operations.
- class flowno.core.event_loop.instrumentation.TaskThrowMetadata(task: RawTask[Command, Any, Any], exception: Exception)[source]
Metadata for task throw operations.
- class flowno.core.event_loop.instrumentation._CompositeInstrument(instruments: list[EventLoopInstrument])[source]
Internal class that dispatches events to multiple instruments.
When multiple instruments are active (nested context managers), this class wraps them and dispatches each event to all instruments in reverse order (inner-most first, then outer).
- on_queue_get(queue: AsyncQueue[T], item: T, immediate: bool) None[source]
Called every time a queue completes a get command.
If the task blocks on an empty queue, this callback fires when an items is available and returned.
- Parameters:
queue (AsyncQueue[T]) – The queue which the item was popped.
item (T) – The item that was popped from the queue.
- on_queue_put(queue: AsyncQueue[T], item: T, immediate: bool) None[source]
Called every time a queue processes a put command.
If the task blocks on a full queue, this callback fires when the queue accepts the item.
- Parameters:
queue (AsyncQueue[T]) – The queue which the item was added to.
item (T) – The item that was added to the queue.
- on_socket_accept_ready(metadata: ReadySocketInstrumentationMetadata) None[source]
Called when a socket accept operation completes.
- Parameters:
metadata – Socket operation metadata including duration
- on_socket_accept_start(metadata: InstrumentationMetadata) None[source]
Called when a task starts waiting on a socket accept.
- Parameters:
metadata – Socket operation metadata
- on_socket_close(metadata: InstrumentationMetadata) None[source]
Called when a socket connection is closed.
- Parameters:
metadata – Socket operation metadata
- on_socket_connect_ready(metadata: SocketConnectReadyMetadata) None[source]
Called when a socket connection has been established.
- Parameters:
metadata – Connection metadata including duration and target address
- on_socket_connect_start(metadata: SocketConnectStartMetadata) None[source]
Called when a socket connection is initiated.
- Parameters:
metadata – Connection metadata including target address
- on_socket_recv_data(metadata: SocketRecvDataMetadata) None[source]
Called immediately after the actual bytes have been read.
- Parameters:
metadata – Metadata including the received bytes
- on_socket_recv_ready(metadata: ReadySocketInstrumentationMetadata) None[source]
Called when the socket is ready for reading.
- Parameters:
metadata – Socket operation metadata including duration
- on_socket_recv_start(metadata: InstrumentationMetadata) None[source]
Called when a task starts waiting on a socket recv.
- Parameters:
metadata – Socket operation metadata
- on_socket_send_data(metadata: SocketSendDataMetadata) None[source]
Called immediately after bytes have been sent to the kernel.
- Parameters:
metadata – Metadata including the sent bytes and actual bytes_sent count
- on_socket_send_ready(metadata: ReadySocketInstrumentationMetadata) None[source]
Called when the socket send operation completes.
- Parameters:
metadata – Socket operation metadata including duration
- on_socket_send_start(metadata: InstrumentationMetadata) None[source]
Called when a task starts waiting on a socket send.
- Parameters:
metadata – Socket operation metadata
- on_task_after_send(metadata: TaskSendMetadata, command: Command) None[source]
Called after a task’s send() method completes successfully.
- Parameters:
metadata – Task send metadata including the task and value that was sent
command – The command yielded by the task
- on_task_after_throw(metadata: TaskThrowMetadata, command: Command) None[source]
Called after a task’s throw() method completes successfully.
- Parameters:
metadata – Task throw metadata including the task and exception that was thrown
command – The command yielded by the task
- on_task_before_send(metadata: TaskSendMetadata) None[source]
Called before a task’s send() method is invoked.
- Parameters:
metadata – Task send metadata including the task and value being sent
- on_task_before_throw(metadata: TaskThrowMetadata) None[source]
Called before a task’s throw() method is invoked.
- Parameters:
metadata – Task throw metadata including the task and exception being thrown
- on_task_cancelled(task: RawTask[Command, Any, Any], exception: Exception) None[source]
Called when a task is cancelled (TaskCancelled exception).
- Parameters:
task – The task that was cancelled
exception – The TaskCancelled exception
- on_task_completed(task: RawTask[Command, Any, Any], result: Any) None[source]
Called when a task completes successfully (StopIteration).
- Parameters:
task – The task that completed
result – The return value of the task
- on_task_error(task: RawTask[Command, Any, Any], exception: Exception) None[source]
Called when a task raises an exception.
- Parameters:
task – The task that raised the exception
exception – The exception that was raised
- on_tls_handshake_complete(metadata: TLSHandshakeMetadata) None[source]
Called after a TLS handshake completes successfully.
- Parameters:
metadata – Metadata including cipher info and TLS version
- flowno.core.event_loop.instrumentation.get_current_instrument() EventLoopInstrument[source]
Get the current instrumentation context.
When multiple instruments are active (nested context managers), returns a composite that dispatches to all of them (inner-most first).
- Returns:
The currently active instrument(s) or an empty instrument if none is active.