from collections.abc import Callable
import inspect
import logging
from dataclasses import dataclass
from types import TracebackType
from typing import Any, ClassVar, TypeVar, cast
from flowno.core.flow.flow import Flow
from flowno.core.node_base import (
DraftInputPortRef,
DraftNode,
DraftOutputPortRef,
FinalizedNode,
NodePlaceholder,
OutputPortRefPlaceholder,
)
from flowno.core.group_node import DraftGroupNode
from typing_extensions import Self, TypeVarTuple, Unpack, override
from collections import OrderedDict
logger = logging.getLogger(__name__)
_Ts = TypeVarTuple("_Ts")
_ReturnTupleT_co = TypeVar("_ReturnTupleT_co", covariant=True, bound=tuple[object, ...])
[docs]
class FlowHDLView:
"""Base implementation of the :class:`FlowHDL` attribute protocol.
``FlowHDLView`` acts like a simple namespace for draft nodes. Public
attribute assignments are stored in ``self._nodes`` while private names
(those starting with ``_``) behave like normal Python attributes. Accessing
an undefined public attribute before the view is finalized returns a
:class:`~flowno.core.node_base.NodePlaceholder` so that connections can be
declared before the target node is defined. Once finalized, attribute
lookups behave normally and missing attributes raise :class:`AttributeError`.
"""
_is_finalized: bool
KEYWORDS: ClassVar[list[str]] = ["register_child_result"]
contextStack: ClassVar[OrderedDict[Self, list[DraftNode]]] = OrderedDict()
[docs]
@dataclass
class FinalizationResult:
nodes: dict[str, Any]
finalized_nodes: dict[
DraftNode[Unpack[tuple[object, ...]], tuple[object, ...]],
FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]],
]
def __init__(self, on_register_finalized_node: Callable[[FinalizedNode], None]) -> None:
self._is_finalized = False
self._nodes: dict[str, Any] = {} # pyright: ignore[reportExplicitAny]
self._child_results: list[FlowHDLView.FinalizationResult] = []
self._on_register_finalized_node = on_register_finalized_node
def __enter__(self: Self) -> Self:
"""Enter the context by adding this instance to the context stack."""
self.__class__.contextStack[self] = []
self._child_results = []
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
"""Finalize the graph when exiting the context by calling :meth:`_finalize`."""
_, draft_nodes = self.__class__.contextStack.popitem()
finalize_connections = len(self.__class__.contextStack) == 0
result = self._finalize(draft_nodes, finalize_connections=finalize_connections)
if self.__class__.contextStack:
parent = next(reversed(self.__class__.contextStack))
parent.register_child_result(result)
return False
@override
def __setattr__(self, key: str, value: Any) -> None:
"""Override the default attribute setter to store nodes in a dictionary.
Ignores attributes starting with an underscore or in the KEYWORDS list.
"""
# Allow setting the _is_finalized attribute (in __init__)
if key.startswith("_") and not key in self.__class__.KEYWORDS:
return super().__setattr__(key, value)
else:
self._nodes[key] = value
@override
def __getattribute__(self, key: str) -> NodePlaceholder:
"""
Override the default attribute getter to return a placeholder for
undefined attributes.
Treats attributes starting with an underscore or in the KEYWORDS list
as normal attributes.
"""
if key.startswith("_") or key in self.__class__.KEYWORDS:
return super().__getattribute__(key)
elif key in self._nodes:
return self._nodes[key]
else:
raise AttributeError(f'Attribute "{key}" not found')
def __getattr__(self, key: str) -> Any:
if self._is_finalized:
raise AttributeError(f'Attribute "{key}" not found')
return NodePlaceholder(key)
[docs]
@classmethod
def register_node(cls, node: DraftNode[Unpack[_Ts], _ReturnTupleT_co]) -> None:
"""Register a draft node in the context stack."""
if cls.contextStack:
# Get the last FlowHDL instance in the context stack
last_hdl = next(reversed(cls.contextStack))
cls.contextStack[last_hdl].append(node)
else:
# raise RuntimeError("No FlowHDL context is active to register the node.")
logger.warning(
f"Node, {node}, registered outside of FlowHDL context. "
"This node will not be automatically finalized."
)
[docs]
def register_child_result(self, result: "FlowHDLView.FinalizationResult") -> None:
"""Register a finalized child context result with this view."""
self._child_results.append(result)
def _finalize(self, draft_nodes: list[DraftNode], *, finalize_connections: bool = True) -> "FlowHDLView.FinalizationResult":
"""Finalize all the draft nodes instantiated in the FlowHDL context.
Replace nodes defined in the FlowHDL context with their finalized
counterparts, resolving all `OutputPortRefPlaceholder` instances to
actual `DraftOutputPortRefs`.
Args:
draft_nodes (list[DraftNode]): A list of draft nodes that were created
within this "layer" of the FlowHDL context.
"""
logger.info("Finalizing FlowHDL")
all_draft_nodes: list[DraftNode] = []
finalized_nodes: dict[
DraftNode[Unpack[tuple[object, ...]], tuple[object, ...]],
FinalizedNode[Unpack[tuple[object, ...]], tuple[object, ...]],
] = dict()
for child in self._child_results:
self._nodes.update(child.nodes)
finalized_nodes.update(child.finalized_nodes)
all_draft_nodes.extend(child.finalized_nodes.keys())
# Map DraftGroupNodes to the draft node returned from the template.
group_alias: dict[DraftGroupNode, DraftNode] = {}
def resolve_target(node: DraftNode) -> DraftNode:
while isinstance(node, DraftGroupNode):
if node in group_alias:
node = group_alias[node]
else:
node = node._return_node
return node
# Replace any DraftGroupNode references stored on this view with the
# node that the group returned. These returned nodes are already part of
# ``child.finalized_nodes`` and will be finalized alongside all other
# draft nodes.
for name, obj in list(self._nodes.items()):
if isinstance(obj, DraftGroupNode):
# Emit debug information when we first encounter the group node.
obj.debug_dummy()
target = resolve_target(obj._return_node)
group_alias[obj] = target
self._nodes[name] = target
clean_draft_nodes: list[DraftNode] = []
for dn in draft_nodes:
if isinstance(dn, DraftGroupNode):
# Avoid printing debug info twice if the group was stored on
# this view and processed above.
if dn not in group_alias:
dn.debug_dummy()
target = resolve_target(dn._return_node)
group_alias.setdefault(dn, target)
continue
all_draft_nodes.append(dn)
clean_draft_nodes.append(dn)
# Redirect any connections that target a DraftGroupNode to the draft
# node produced by the group. The group node itself is dropped from the
# graph so upstream and downstream links must be rewired.
for draft_node in all_draft_nodes:
for input_port in draft_node._input_ports.values():
conn = input_port.connected_output
if (
isinstance(conn, DraftOutputPortRef)
and isinstance(conn.node, DraftGroupNode)
and conn.node in group_alias
):
group_node = conn.node
replacement = group_alias[group_node]
while isinstance(replacement, DraftGroupNode) and replacement in group_alias:
replacement = group_alias[replacement]
# Remove consumer from the group node
try:
group_node._connected_output_nodes[conn.port_index].remove(
draft_node
)
except (KeyError, ValueError):
pass
# Register consumer on the replacement node
replacement._connected_output_nodes[conn.port_index].append(
draft_node
)
conn.node = replacement
# Also ensure producers no longer list the dropped group nodes. We
# already added the replacement node as a consumer when we rewired the
# input ports above, so simply drop the group node entries here.
for producer in all_draft_nodes:
for port_index, consumers in producer._connected_output_nodes.items():
producer._connected_output_nodes[port_index] = [
c
for c in consumers
if not isinstance(c, DraftGroupNode)
]
# ======== Phase 1 ========
# Replace all OutputPortRefPlaceholders with actual DraftOutputPortRefs
# OutputPortRefPlaceholders are generated when using a forward reference
# on the FlowHDLView context.
for unknown_node in all_draft_nodes:
draft_node = cast(
DraftNode[Unpack[tuple[object, ...]], tuple[object, ...]], unknown_node
)
# DraftInputPorts can have OutputPortRefPlaceholders or DraftOutputPortRefs
# Step 1) Replace placholders with drafts
for input_port_index, input_port in draft_node._input_ports.items():
if input_port.connected_output is None:
if input_port.default_value != inspect.Parameter.empty:
logger.info(
f"{draft_node.input(input_port_index)} is not connected but has a default value"
)
continue
else:
# TODO: Use the same underlined format as supernode.py
raise AttributeError(
f"{draft_node.input(input_port_index)} is not connected and has no default value"
)
connected_output = input_port.connected_output
if isinstance(connected_output, OutputPortRefPlaceholder):
# validate that the placeholder has been defined on the FlowHDL instance
if connected_output.node.name not in self._nodes:
raise AttributeError(
(
f"Node {connected_output.node.name} is referenced, but has not been defined. "
f"Cannot connect {input_port} to non-existent node {connected_output.node.name}"
)
)
output_source_node = self._nodes[connected_output.node.name]
# if the placeholder has been defined on the FlowHDL instance but is not a DraftNode, raise an error
if not isinstance(output_source_node, DraftNode):
raise AttributeError(
(
f"Attribute {connected_output.node.name} is not a DraftNode. "
f"Cannot connect {draft_node} to non-DraftNode {connected_output.node.name}"
)
)
# the placeholder was defined on the FlowHDL instance and is a DraftNode, so connect the nodes
logger.debug(f"Connecting {output_source_node} to {input_port}")
output_source_node.output(
input_port.connected_output.port_index
).connect(draft_node.input(input_port_index))
# ======== Phase 2 ========
# Now that all OutputPortRefPlaceholders have been replaced with
# DraftOutputPortRefs, we wrap each draft node in a blank finalized node
# and register it with the flow.
for draft_node in clean_draft_nodes:
finalized_node = draft_node._blank_finalized()
finalized_nodes[draft_node] = finalized_node
self._on_register_finalized_node(finalized_node)
if finalize_connections:
# ======== Phase 3 ========
# Now that the finalized nodes exist, we can finalize wire up the connections.
for draft_node, finalized_node in finalized_nodes.items():
finalized_node._input_ports = {
index: draft_input_port._finalize(index, finalized_nodes)
for index, draft_input_port in draft_node._input_ports.items()
}
finalized_node._connected_output_nodes = {
index: [
finalized_nodes[connected_draft]
for connected_draft in connected_drafts
]
for index, connected_drafts in draft_node._connected_output_nodes.items()
}
# ======== Phase 4 ========
# Replace all DraftNodes in self._nodes with their finalized counterparts.
for name, obj in self._nodes.items():
if isinstance(obj, DraftNode):
target = obj
while isinstance(target, DraftGroupNode) and target in group_alias:
target = group_alias[target]
self._nodes[name] = finalized_nodes[target]
self._is_finalized = True
self._child_results = []
logger.debug("Finished Finalizing FlowHDL into Flow")
return FlowHDLView.FinalizationResult(nodes=dict(self._nodes), finalized_nodes=finalized_nodes)