flowno.core.event_loop.queues

Asynchronous queue implementations for the Flowno event loop.

This module provides queue classes that integrate with Flowno’s custom event loop, allowing tasks to safely exchange data and coordinate their execution. These queues implement the AsyncIterator protocol, making them convenient for use in async for loops.

Examples

Basic queue operations:

>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.queues import AsyncQueue
>>>
>>> async def producer_consumer():
...     # Create a queue with maximum size 2
...     queue = AsyncQueue(maxsize=2)
...
...     # Put some items into the queue
...     await queue.put("task 1")
...     await queue.put("task 2")
...
...     # Peek at the first item without removing it
...     first = await queue.peek()
...
...     # Get and process items
...     item1 = await queue.get()
...     item2 = await queue.get()
...
...     # Close the queue when done
...     await queue.close()
...     return (first, item1, item2)
>>>
>>> loop = EventLoop()
>>> result = loop.run_until_complete(producer_consumer(), join=True)
>>> result
('task 1', 'task 1', 'task 2')

Using a queue as an async iterator:

>>> async def queue_iterator_example():
...     queue = AsyncQueue()
...
...     # Add some items
...     for i in range(3):
...         await queue.put(f"item {i}")
...
...     # Process all items using async for
...     results = []
...     async for item in queue.until_empty():
...         results.append(item)
...
...     return results
>>>
>>> loop = EventLoop()
>>> loop.run_until_complete(queue_iterator_example(), join=True)
['item 0', 'item 1', 'item 2']
class flowno.core.event_loop.queues.AsyncMapQueue(maxsize: int | None = None)[source]

An async queue that maps keys to lists of causes (reasons).

This queue maintains unique keys in FIFO order while accumulating multiple causes for each key. When a key is put multiple times, the causes are appended to that key’s list rather than duplicating the key in the queue.

Example

>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.queues import AsyncMapQueue
>>>
>>> async def map_queue_example():
...     queue = AsyncMapQueue()
...
...     # Put same key multiple times with different reasons
...     await queue.put(("node1", "completed generation (0,)"))
...     await queue.put(("node2", "barrier released"))
...     await queue.put(("node1", "received stalled request"))
...
...     print(len(queue))  # 2 (unique keys: node1, node2)
...
...     # Get returns (key, [causes])
...     key, causes = await queue.get()
...     print(key, causes)
...     # ('node1', ['completed generation (0,)', 'received stalled request'])
...
...     key, causes = await queue.get()
...     print(key, causes)
...     # ('node2', ['barrier released'])
...
...     return "done"
>>>
>>> loop = EventLoop()
>>> loop.run_until_complete(map_queue_example(), join=True)
2
node1 ['completed generation (0,)', 'received stalled request']
node2 ['barrier released']
'done'
async close() None[source]

Close the queue, preventing further put operations.

async get() tuple[_K, list[str]][source]

Get the next (key, causes) pair from the queue.

Returns the key and all accumulated causes for that key, then removes the key from the queue and map.

Returns:

A tuple of (key, list of causes)

Raises:

QueueClosedError – If the queue is closed and empty

is_closed() bool[source]

Check if the queue is closed.

async peek() tuple[_K, list[str]][source]

Peek at the next (key, causes) pair without removing it.

Returns:

A tuple of (key, list of causes)

Raises:

QueueClosedError – If the queue is closed and empty

async put(item: tuple[_K, str]) None[source]

Put a (key, cause) pair into the queue.

If the key already exists in the queue, the cause is appended to that key’s list. Otherwise, the key is added to the queue with a new list containing the cause.

Parameters:

item – A tuple of (key, cause) where cause describes why the key was enqueued

Raises:
class flowno.core.event_loop.queues.AsyncQueue(maxsize: int | None = None)[source]

An asynchronous queue for the Flowno event loop.

This queue allows tasks to exchange data safely, with proper synchronization handled by the event loop. When used as an async iterator, it yields items until the queue is closed.

Parameters:

maxsize – The maximum number of items allowed in the queue. When the queue reaches this size, put() operations will block until items are removed. If None, the queue size is unbounded.

async close() None[source]

Close the queue, preventing further put operations.

After closing:
  • put() will raise QueueClosedError

  • get() will succeed until the queue is empty, then raise QueueClosedError

  • AsyncIterator interface will stop iteration when the queue is empty

close_nowait(event_loop: EventLoop) None[source]

Synchronously close the queue.

This method is designed to be called from within a command handler, where async operations are not possible. It handles waiter notification synchronously through the event loop.

Parameters:

event_loop – The event loop to use for notifying waiters

async get() _T[source]

Get an item from the queue.

If the queue is empty, this will wait until an item is put into the queue.

Returns:

The next item from the queue

Raises:

QueueClosedError – If the queue is closed and empty

is_closed() bool[source]

Check if the queue is closed.

Returns:

True if the queue is closed, False otherwise

async peek() _T[source]

Peek at the next item without removing it from the queue.

If the queue is empty, this will wait until an item is put into the queue.

Returns:

The next item from the queue (without removing it)

Raises:

QueueClosedError – If the queue is closed and empty

async put(item: _T) None[source]

Put an item into the queue.

If the queue is full and has a maxsize, this will wait until space is available.

Parameters:

item – The item to put into the queue

Raises:

QueueClosedError – If the queue is closed

until_empty() AsyncIterator[_T][source]

Get an async iterator that consumes all items until the queue is empty.

This iterator will close the queue automatically when all items are consumed, unless specified otherwise.

Returns:

An async iterator that yields items until the queue is empty

class flowno.core.event_loop.queues.AsyncSetQueue(maxsize: int | None = None)[source]

A queue variant that ensures each item appears only once.

This queue behaves like a standard AsyncQueue, but automatically deduplicates items based on equality.

Example

>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.queues import AsyncSetQueue
>>>
>>> async def set_queue_example():
...     queue = AsyncSetQueue()
...
...     # Add some items with duplicates
...     await queue.put("apple")
...     await queue.put("banana")
...     await queue.put("apple")  # This won't be added again
...     await queue.put("cherry")
...
...     # Get all unique items
...     items = []
...     while len(queue) > 0:
...         items.append(await queue.get())
...
...     return items
>>>
>>> loop = EventLoop()
>>> loop.run_until_complete(set_queue_example(), join=True)
['apple', 'banana', 'cherry']
async put(item: _T) None[source]

Put an item into the queue if it’s not already present.

Parameters:

item – The item to put into the queue

Raises:

QueueClosedError – If the queue is closed

async putAll(items: list[_T]) None[source]

Put multiple unique items into the queue.

Parameters:

items – A list of items to add to the queue

Raises:

QueueClosedError – If the queue is closed

put_nowait(item: _T, event_loop: EventLoop) bool[source]

Synchronously put an item into the queue if not already present.

This method is designed to be called from within a command handler, where async operations are not possible. It handles waiter notification synchronously through the event loop.

Parameters:
  • item – The item to put into the queue

  • event_loop – The event loop to use for notifying waiters

Returns:

True if the item was added, False if it was already present

Raises:

QueueClosedError – If the queue is closed

exception flowno.core.event_loop.queues.QueueClosedError[source]

Raised when attempting to put/get on a closed queue.

class flowno.core.event_loop.queues._UntilEmptyIterator(queue: AsyncQueue[_T], self_closing: bool = True)[source]

Helper class for implementing the until_empty method.