flowno.core.event_loop.queues
Asynchronous queue implementations for the Flowno event loop.
This module provides queue classes that integrate with Flowno’s custom event loop, allowing tasks to safely exchange data and coordinate their execution. These queues implement the AsyncIterator protocol, making them convenient for use in async for loops.
Examples
Basic queue operations:
>>> from flowno.core.event_loop.event_loop import EventLoop
>>> from flowno.core.event_loop.queues import AsyncQueue
>>>
>>> async def producer_consumer():
... # Create a queue with maximum size 2
... queue = AsyncQueue(maxsize=2)
...
... # Put some items into the queue
... await queue.put("task 1")
... await queue.put("task 2")
...
... # Peek at the first item without removing it
... first = await queue.peek()
...
... # Get and process items
... item1 = await queue.get()
... item2 = await queue.get()
...
... # Close the queue when done
... await queue.close()
... return (first, item1, item2)
>>>
>>> loop = EventLoop()
>>> result = loop.run_until_complete(producer_consumer(), join=True)
>>> result
('task 1', 'task 1', 'task 2')
Using a queue as an async iterator:
>>> async def queue_iterator_example():
... queue = AsyncQueue()
...
... # Add some items
... for i in range(3):
... await queue.put(f"item {i}")
...
... # Process all items using async for
... results = []
... async for item in queue.until_empty():
... results.append(item)
...
... return results
>>>
>>> loop = EventLoop()
>>> loop.run_until_complete(queue_iterator_example(), join=True)
['item 0', 'item 1', 'item 2']
- class flowno.core.event_loop.queues.AsyncMapQueue(maxsize: int | None = None)[source]
An async queue that maps keys to lists of causes (reasons).
This queue maintains unique keys in FIFO order while accumulating multiple causes for each key. When a key is put multiple times, the causes are appended to that key’s list rather than duplicating the key in the queue.
Example
>>> from flowno.core.event_loop.event_loop import EventLoop >>> from flowno.core.event_loop.queues import AsyncMapQueue >>> >>> async def map_queue_example(): ... queue = AsyncMapQueue() ... ... # Put same key multiple times with different reasons ... await queue.put(("node1", "completed generation (0,)")) ... await queue.put(("node2", "barrier released")) ... await queue.put(("node1", "received stalled request")) ... ... print(len(queue)) # 2 (unique keys: node1, node2) ... ... # Get returns (key, [causes]) ... key, causes = await queue.get() ... print(key, causes) ... # ('node1', ['completed generation (0,)', 'received stalled request']) ... ... key, causes = await queue.get() ... print(key, causes) ... # ('node2', ['barrier released']) ... ... return "done" >>> >>> loop = EventLoop() >>> loop.run_until_complete(map_queue_example(), join=True) 2 node1 ['completed generation (0,)', 'received stalled request'] node2 ['barrier released'] 'done'
- async get() tuple[_K, list[str]][source]
Get the next (key, causes) pair from the queue.
Returns the key and all accumulated causes for that key, then removes the key from the queue and map.
- Returns:
A tuple of (key, list of causes)
- Raises:
QueueClosedError – If the queue is closed and empty
- async peek() tuple[_K, list[str]][source]
Peek at the next (key, causes) pair without removing it.
- Returns:
A tuple of (key, list of causes)
- Raises:
QueueClosedError – If the queue is closed and empty
- async put(item: tuple[_K, str]) None[source]
Put a (key, cause) pair into the queue.
If the key already exists in the queue, the cause is appended to that key’s list. Otherwise, the key is added to the queue with a new list containing the cause.
- Parameters:
item – A tuple of (key, cause) where cause describes why the key was enqueued
- Raises:
QueueClosedError – If the queue is closed
ValueError – If item is not a 2-tuple of (key, cause)
- class flowno.core.event_loop.queues.AsyncQueue(maxsize: int | None = None)[source]
An asynchronous queue for the Flowno event loop.
This queue allows tasks to exchange data safely, with proper synchronization handled by the event loop. When used as an async iterator, it yields items until the queue is closed.
- Parameters:
maxsize – The maximum number of items allowed in the queue. When the queue reaches this size, put() operations will block until items are removed. If None, the queue size is unbounded.
- async close() None[source]
Close the queue, preventing further put operations.
- After closing:
put() will raise QueueClosedError
get() will succeed until the queue is empty, then raise QueueClosedError
AsyncIterator interface will stop iteration when the queue is empty
- close_nowait(event_loop: EventLoop) None[source]
Synchronously close the queue.
This method is designed to be called from within a command handler, where async operations are not possible. It handles waiter notification synchronously through the event loop.
- Parameters:
event_loop – The event loop to use for notifying waiters
- async get() _T[source]
Get an item from the queue.
If the queue is empty, this will wait until an item is put into the queue.
- Returns:
The next item from the queue
- Raises:
QueueClosedError – If the queue is closed and empty
- is_closed() bool[source]
Check if the queue is closed.
- Returns:
True if the queue is closed, False otherwise
- async peek() _T[source]
Peek at the next item without removing it from the queue.
If the queue is empty, this will wait until an item is put into the queue.
- Returns:
The next item from the queue (without removing it)
- Raises:
QueueClosedError – If the queue is closed and empty
- async put(item: _T) None[source]
Put an item into the queue.
If the queue is full and has a maxsize, this will wait until space is available.
- Parameters:
item – The item to put into the queue
- Raises:
QueueClosedError – If the queue is closed
- until_empty() AsyncIterator[_T][source]
Get an async iterator that consumes all items until the queue is empty.
This iterator will close the queue automatically when all items are consumed, unless specified otherwise.
- Returns:
An async iterator that yields items until the queue is empty
- class flowno.core.event_loop.queues.AsyncSetQueue(maxsize: int | None = None)[source]
A queue variant that ensures each item appears only once.
This queue behaves like a standard AsyncQueue, but automatically deduplicates items based on equality.
Example
>>> from flowno.core.event_loop.event_loop import EventLoop >>> from flowno.core.event_loop.queues import AsyncSetQueue >>> >>> async def set_queue_example(): ... queue = AsyncSetQueue() ... ... # Add some items with duplicates ... await queue.put("apple") ... await queue.put("banana") ... await queue.put("apple") # This won't be added again ... await queue.put("cherry") ... ... # Get all unique items ... items = [] ... while len(queue) > 0: ... items.append(await queue.get()) ... ... return items >>> >>> loop = EventLoop() >>> loop.run_until_complete(set_queue_example(), join=True) ['apple', 'banana', 'cherry']
- async put(item: _T) None[source]
Put an item into the queue if it’s not already present.
- Parameters:
item – The item to put into the queue
- Raises:
QueueClosedError – If the queue is closed
- async putAll(items: list[_T]) None[source]
Put multiple unique items into the queue.
- Parameters:
items – A list of items to add to the queue
- Raises:
QueueClosedError – If the queue is closed
- put_nowait(item: _T, event_loop: EventLoop) bool[source]
Synchronously put an item into the queue if not already present.
This method is designed to be called from within a command handler, where async operations are not possible. It handles waiter notification synchronously through the event loop.
- Parameters:
item – The item to put into the queue
event_loop – The event loop to use for notifying waiters
- Returns:
True if the item was added, False if it was already present
- Raises:
QueueClosedError – If the queue is closed
- exception flowno.core.event_loop.queues.QueueClosedError[source]
Raised when attempting to put/get on a closed queue.
- class flowno.core.event_loop.queues._UntilEmptyIterator(queue: AsyncQueue[_T], self_closing: bool = True)[source]
Helper class for implementing the until_empty method.