Source code for flowno.core.event_loop.tasks

"""
Task management for the Flowno event loop.

This module provides classes for creating, managing, and interacting with asynchronous 
tasks in the Flowno event loop. The main class is TaskHandle, which represents a 
spawned task and allows operations like cancellation and joining.

Examples:
    >>> from flowno.core.event_loop.event_loop import EventLoop
    >>> from flowno.core.event_loop.primitives import spawn
    >>> 
    >>> async def worker():
    ...     # Some async work
    ...     return 42
    >>> 
    >>> async def main():
    ...     # Spawn a new task
    ...     task = await spawn(worker())
    ...     
    ...     # Check if it's a TaskHandle
    ...     print(f"task is a TaskHandle: {isinstance(task, TaskHandle)}")
    ...     
    ...     # Wait for the task to complete
    ...     result = await task.join()
    ...     print(f"Task result: {result}")
    ...     
    ...     return result
    >>> 
    >>> # Run the example
    >>> loop = EventLoop()
    >>> result = loop.run_until_complete(main(), join=True)
    task is a TaskHandle: True
    Task result: 42
    >>> print(result)
    42
"""

from collections.abc import Generator
from dataclasses import dataclass
from types import coroutine
from typing import TYPE_CHECKING, Generic, TypeVar, cast, final

from .types import RawTask

if TYPE_CHECKING:
    from .event_loop import EventLoop  # Only import during type checking

from .commands import CancelCommand, Command, JoinCommand

_T_co = TypeVar("_T_co", covariant=True)


[docs] @final class TaskHandle(Generic[_T_co]): """ A handle to a spawned task that allows cancellation and joining. TaskHandle objects are returned by the spawn primitive and represent concurrent tasks running in the event loop. They can be used to: - Check if a task has completed, failed, or been cancelled - Cancel a running task - Join (wait for) a task to complete and get its return value """ def __init__(self, event_loop: "EventLoop", raw_task: RawTask[Command, object, _T_co]): """ Initialize a TaskHandle. Args: event_loop: The event loop managing this task raw_task: The raw coroutine task being managed """ self.event_loop = event_loop self.raw_task = raw_task
[docs] def cancel( self, ) -> Generator[CancelCommand[_T_co], object, _T_co]: """ Cancel this task and wait for it to complete. This method immediately initiates cancellation of the task and returns an awaitable that can be used to wait for the cancellation to complete and retrieve any return value from the cancelled task. If the task is still running, it will receive a TaskCancelled exception. If the task has already finished, errored, or been cancelled, this method returns the existing result or exception without sending TaskCancelled. When awaited, you will receive: - The value returned by the task (e.g., from finally block or except handler) - None if the task let TaskCancelled propagate (successful cancellation) - Any other exception raised during cleanup or prior to cancellation Returns: An awaitable that resolves to the task's result. Raises: Exception: Any exception that was raised by the task (only when awaited). Note: Even if you don't await the return value, running tasks will still be cancelled. Awaiting allows you to wait for cleanup to complete and get any return value. """ # Immediately initiate cancellation (fire-and-forget for non-awaited case) self.event_loop.cancel(self.raw_task) # Return an awaitable that yields CancelCommand for the event loop to handle # The CancelCommand handler will wait for the task to finish and return its result @coroutine def _wait_for_cancellation() -> Generator[CancelCommand[_T_co], object, _T_co]: received = yield CancelCommand(self) return cast(_T_co, received) return _wait_for_cancellation()
@property def is_finished(self) -> bool: """ Check if the task completed successfully. Returns: True if the task has finished execution without errors. """ return self.raw_task in self.event_loop.finished @property def is_error(self) -> bool: """ Check if the task completed with an error. Returns: True if the task raised an exception (not including cancellation). """ return self.raw_task in self.event_loop.exceptions and not self.is_cancelled @property def is_cancelled(self) -> bool: """ Check if the task was cancelled. Returns: True if the task was cancelled. """ return self.raw_task in self.event_loop.cancelled
[docs] @coroutine def join( self, ) -> Generator[JoinCommand[_T_co], object, _T_co]: """ Wait for this task to complete and get its result. This is a coroutine that yields a JoinCommand for the event loop to process. When the task completes, this coroutine will resume with its result. Returns: The value returned by the task. Raises: Exception: Any exception that was raised by the task. TaskCancelled: If the task was cancelled. """ received = yield JoinCommand(self) return cast(_T_co, received)
[docs] @dataclass class TaskCancelled(Exception): """ Exception raised when a task is cancelled. This exception is raised when attempting to join a task that has been cancelled. """ by: TaskHandle[object] def __str__(self) -> str: return f"Task was cancelled"
__all__ = [ "TaskHandle", "TaskCancelled", ]