Skip to main content

dfir_lang/graph/
meta_graph.rs

1#![warn(missing_docs)]
2
3extern crate proc_macro;
4
5use std::collections::{BTreeMap, BTreeSet};
6use std::fmt::Debug;
7use std::iter::FusedIterator;
8
9use itertools::Itertools;
10use proc_macro2::{Ident, Literal, Span, TokenStream};
11use quote::{ToTokens, format_ident, quote, quote_spanned};
12use serde::{Deserialize, Serialize};
13use slotmap::{Key, SecondaryMap, SlotMap, SparseSecondaryMap};
14use syn::spanned::Spanned;
15
16use super::graph_write::{Dot, GraphWrite, Mermaid};
17use super::ops::{
18    DelayType, OPERATORS, OperatorWriteOutput, WriteContextArgs, find_op_op_constraints,
19    null_write_iterator_fn,
20};
21use super::{
22    CONTEXT, Color, DiMulGraph, GRAPH, GraphEdgeId, GraphLoopId, GraphNode, GraphNodeId,
23    GraphSubgraphId, HANDOFF_NODE_STR, HandoffKind, MODULE_BOUNDARY_NODE_STR, OperatorInstance,
24    PortIndexValue, SINGLETON_SLOT_NODE_STR, Varname, change_spans, get_operator_generics,
25};
26use crate::diagnostic::{Diagnostic, Diagnostics, Level};
27use crate::pretty_span::{PrettyRowCol, PrettySpan};
28use crate::process_singletons;
29
30/// A resolved handoff reference: the target node ID plus mutability and access group info.
31#[derive(Clone, Debug, Serialize, Deserialize)]
32pub struct ResolvedHandoffRef {
33    /// The resolved target node ID (`None` if unresolved/error).
34    pub node_id: Option<GraphNodeId>,
35    /// Whether this is a mutable reference (`#mut var`).
36    pub is_mut: bool,
37    /// Optional access group for ordering (`#{N} var`).
38    pub access_group: Option<u32>,
39}
40
41/// An abstract "meta graph" representation of a DFIR graph.
42///
43/// Can be with or without subgraph partitioning, stratification, and handoff insertion. This is
44/// the meta graph used for generating Rust source code in macros from DFIR sytnax.
45///
46/// This struct has a lot of methods for manipulating the graph, vaguely grouped together in
47/// separate `impl` blocks. You might notice a few particularly specific arbitray-seeming methods
48/// in here--those are just what was needed for the compilation algorithms. If you need another
49/// method then add it.
50#[derive(Default, Debug, Serialize, Deserialize)]
51pub struct DfirGraph {
52    /// Each node type (operator or handoff).
53    nodes: SlotMap<GraphNodeId, GraphNode>,
54
55    /// Instance data corresponding to each operator node.
56    /// This field will be empty after deserialization.
57    #[serde(skip)]
58    operator_instances: SecondaryMap<GraphNodeId, OperatorInstance>,
59    /// Debugging/tracing tag for each operator node.
60    operator_tag: SecondaryMap<GraphNodeId, String>,
61    /// Graph data structure (two-way adjacency list).
62    graph: DiMulGraph<GraphNodeId, GraphEdgeId>,
63    /// Input and output port for each edge.
64    ports: SecondaryMap<GraphEdgeId, (PortIndexValue, PortIndexValue)>,
65
66    /// Which loop a node belongs to (or none for top-level).
67    node_loops: SecondaryMap<GraphNodeId, GraphLoopId>,
68    /// Which nodes belong to each loop.
69    loop_nodes: SlotMap<GraphLoopId, Vec<GraphNodeId>>,
70    /// For the loop, what is its parent (`None` for top-level).
71    loop_parent: SparseSecondaryMap<GraphLoopId, GraphLoopId>,
72    /// What loops are at the root.
73    root_loops: Vec<GraphLoopId>,
74    /// For the loop, what are its child loops.
75    loop_children: SecondaryMap<GraphLoopId, Vec<GraphLoopId>>,
76
77    /// Which subgraph each node belongs to.
78    node_subgraph: SecondaryMap<GraphNodeId, GraphSubgraphId>,
79
80    /// Which nodes belong to each subgraph.
81    subgraph_nodes: SlotMap<GraphSubgraphId, Vec<GraphNodeId>>,
82    /// Subgraph IDs in topological sort order (set during partitioning).
83    subgraph_toposort: Vec<GraphSubgraphId>,
84
85    /// Resolved handoff varnames references, per node.
86    node_handoff_references: SparseSecondaryMap<GraphNodeId, Vec<ResolvedHandoffRef>>,
87    /// What variable name each graph node belongs to (if any). For debugging (graph writing) purposes only.
88    node_varnames: SparseSecondaryMap<GraphNodeId, Varname>,
89
90    /// Delay type for handoff nodes that represent tick-boundary back-edges.
91    /// Set by `order_subgraphs` for `defer_tick` / `defer_tick_lazy`, either on handoff nodes
92    /// it injects or on existing handoff nodes that it marks as tick-boundary back-edges.
93    handoff_delay_type: SparseSecondaryMap<GraphNodeId, DelayType>,
94}
95
96/// Basic methods.
97impl DfirGraph {
98    /// Create a new empty graph.
99    pub fn new() -> Self {
100        Default::default()
101    }
102}
103
104/// Node methods.
105impl DfirGraph {
106    /// Get a node with its operator instance (if applicable).
107    pub fn node(&self, node_id: GraphNodeId) -> &GraphNode {
108        self.nodes.get(node_id).expect("Node not found.")
109    }
110
111    /// Get the `OperatorInstance` for a given node. Node must be an operator and have an
112    /// `OperatorInstance` present, otherwise will return `None`.
113    ///
114    /// Note that no operator instances will be persent after deserialization.
115    pub fn node_op_inst(&self, node_id: GraphNodeId) -> Option<&OperatorInstance> {
116        self.operator_instances.get(node_id)
117    }
118
119    /// Get the debug variable name attached to a graph node.
120    pub fn node_varname(&self, node_id: GraphNodeId) -> Option<&Varname> {
121        self.node_varnames.get(node_id)
122    }
123
124    /// Get subgraph for node.
125    pub fn node_subgraph(&self, node_id: GraphNodeId) -> Option<GraphSubgraphId> {
126        self.node_subgraph.get(node_id).copied()
127    }
128
129    /// Degree into a node, i.e. the number of predecessors.
130    pub fn node_degree_in(&self, node_id: GraphNodeId) -> usize {
131        self.graph.degree_in(node_id)
132    }
133
134    /// Degree out of a node, i.e. the number of successors.
135    pub fn node_degree_out(&self, node_id: GraphNodeId) -> usize {
136        self.graph.degree_out(node_id)
137    }
138
139    /// Successors, iterator of `(GraphEdgeId, GraphNodeId)` of outgoing edges.
140    pub fn node_successors(
141        &self,
142        src: GraphNodeId,
143    ) -> impl '_
144    + DoubleEndedIterator<Item = (GraphEdgeId, GraphNodeId)>
145    + ExactSizeIterator
146    + FusedIterator
147    + Clone
148    + Debug {
149        self.graph.successors(src)
150    }
151
152    /// Predecessors, iterator of `(GraphEdgeId, GraphNodeId)` of incoming edges.
153    pub fn node_predecessors(
154        &self,
155        dst: GraphNodeId,
156    ) -> impl '_
157    + DoubleEndedIterator<Item = (GraphEdgeId, GraphNodeId)>
158    + ExactSizeIterator
159    + FusedIterator
160    + Clone
161    + Debug {
162        self.graph.predecessors(dst)
163    }
164
165    /// Successor edges, iterator of `GraphEdgeId` of outgoing edges.
166    pub fn node_successor_edges(
167        &self,
168        src: GraphNodeId,
169    ) -> impl '_
170    + DoubleEndedIterator<Item = GraphEdgeId>
171    + ExactSizeIterator
172    + FusedIterator
173    + Clone
174    + Debug {
175        self.graph.successor_edges(src)
176    }
177
178    /// Predecessor edges, iterator of `GraphEdgeId` of incoming edges.
179    pub fn node_predecessor_edges(
180        &self,
181        dst: GraphNodeId,
182    ) -> impl '_
183    + DoubleEndedIterator<Item = GraphEdgeId>
184    + ExactSizeIterator
185    + FusedIterator
186    + Clone
187    + Debug {
188        self.graph.predecessor_edges(dst)
189    }
190
191    /// Successor nodes, iterator of `GraphNodeId`.
192    pub fn node_successor_nodes(
193        &self,
194        src: GraphNodeId,
195    ) -> impl '_
196    + DoubleEndedIterator<Item = GraphNodeId>
197    + ExactSizeIterator
198    + FusedIterator
199    + Clone
200    + Debug {
201        self.graph.successor_vertices(src)
202    }
203
204    /// Predecessor nodes, iterator of `GraphNodeId`.
205    pub fn node_predecessor_nodes(
206        &self,
207        dst: GraphNodeId,
208    ) -> impl '_
209    + DoubleEndedIterator<Item = GraphNodeId>
210    + ExactSizeIterator
211    + FusedIterator
212    + Clone
213    + Debug {
214        self.graph.predecessor_vertices(dst)
215    }
216
217    /// Iterator of node IDs `GraphNodeId`.
218    pub fn node_ids(&self) -> slotmap::basic::Keys<'_, GraphNodeId, GraphNode> {
219        self.nodes.keys()
220    }
221
222    /// Iterator over `(GraphNodeId, &Node)` pairs.
223    pub fn nodes(&self) -> slotmap::basic::Iter<'_, GraphNodeId, GraphNode> {
224        self.nodes.iter()
225    }
226
227    /// Insert a node, assigning the given varname.
228    pub fn insert_node(
229        &mut self,
230        node: GraphNode,
231        varname_opt: Option<Ident>,
232        loop_opt: Option<GraphLoopId>,
233    ) -> GraphNodeId {
234        let node_id = self.nodes.insert(node);
235        if let Some(varname) = varname_opt {
236            self.node_varnames.insert(node_id, Varname(varname));
237        }
238        if let Some(loop_id) = loop_opt {
239            self.node_loops.insert(node_id, loop_id);
240            self.loop_nodes[loop_id].push(node_id);
241        }
242        node_id
243    }
244
245    /// Insert an operator instance for the given node. Panics if already set.
246    pub fn insert_node_op_inst(&mut self, node_id: GraphNodeId, op_inst: OperatorInstance) {
247        assert!(matches!(
248            self.nodes.get(node_id),
249            Some(GraphNode::Operator(_))
250        ));
251        let old_inst = self.operator_instances.insert(node_id, op_inst);
252        assert!(old_inst.is_none());
253    }
254
255    /// Assign all operator instances if not set. Write diagnostic messages/errors into `diagnostics`.
256    pub fn insert_node_op_insts_all(&mut self, diagnostics: &mut Diagnostics) {
257        // Handle all nodes in two phases, since the helper methods take total ownership of `&self`.
258        // Possible to do in one phase, but would require accessing fields directly for partial mutable ownership.
259
260        // Collect operator instances, then assign.
261        let mut op_insts = Vec::new();
262        // Collect nodes that should be lowered to handoffs (the `handoff()`/`singleton()` pseudo-operators).
263        let mut handoff_nodes: Vec<(GraphNodeId, HandoffKind, Span)> = Vec::new();
264
265        for (node_id, node) in self.nodes() {
266            let GraphNode::Operator(operator) = node else {
267                continue;
268            };
269            if self.node_op_inst(node_id).is_some() {
270                continue;
271            };
272
273            // Recognize `handoff()`/`singleton()` pseudo-operators and lower to GraphNode::Handoff.
274            let handoff_kind = match &*operator.name_string() {
275                "handoff" => Some(HandoffKind::Vec),
276                "singleton" => Some(HandoffKind::Singleton),
277                "optional" => Some(HandoffKind::Optional),
278                _ => None,
279            };
280            if let Some(kind) = handoff_kind {
281                if !operator.args.is_empty() {
282                    diagnostics.push(Diagnostic::spanned(
283                        operator.path.span(),
284                        Level::Error,
285                        format!("`{}` takes no arguments.", operator.name_string()),
286                    ));
287                }
288                if operator.type_arguments().is_some() {
289                    diagnostics.push(Diagnostic::spanned(
290                        operator.path.span(),
291                        Level::Error,
292                        format!("`{}` takes no generic arguments.", operator.name_string()),
293                    ));
294                }
295                handoff_nodes.push((node_id, kind, operator.path.span()));
296                continue;
297            }
298
299            // Op constraints.
300            let Some(op_constraints) = find_op_op_constraints(operator) else {
301                diagnostics.push(Diagnostic::spanned(
302                    operator.path.span(),
303                    Level::Error,
304                    format!("Unknown operator `{}`", operator.name_string()),
305                ));
306                continue;
307            };
308
309            // Input and output ports.
310            let (input_ports, output_ports) = {
311                let mut input_edges: Vec<(&PortIndexValue, GraphNodeId)> = self
312                    .node_predecessors(node_id)
313                    .map(|(edge_id, pred_id)| (self.edge_ports(edge_id).1, pred_id))
314                    .collect();
315                // Ensure sorted by port index.
316                input_edges.sort();
317                let input_ports: Vec<PortIndexValue> = input_edges
318                    .into_iter()
319                    .map(|(port, _pred)| port)
320                    .cloned()
321                    .collect();
322
323                // Collect output arguments (successors).
324                let mut output_edges: Vec<(&PortIndexValue, GraphNodeId)> = self
325                    .node_successors(node_id)
326                    .map(|(edge_id, succ)| (self.edge_ports(edge_id).0, succ))
327                    .collect();
328                // Ensure sorted by port index.
329                output_edges.sort();
330                let output_ports: Vec<PortIndexValue> = output_edges
331                    .into_iter()
332                    .map(|(port, _succ)| port)
333                    .cloned()
334                    .collect();
335
336                (input_ports, output_ports)
337            };
338
339            // Generic arguments.
340            let generics = get_operator_generics(diagnostics, operator);
341            // Generic argument errors.
342            {
343                // Span of `generic_args` (if it exists), otherwise span of the operator name.
344                let generics_span = generics
345                    .generic_args
346                    .as_ref()
347                    .map(Spanned::span)
348                    .unwrap_or_else(|| operator.path.span());
349
350                if !op_constraints
351                    .persistence_args
352                    .contains(&generics.persistence_args.len())
353                {
354                    diagnostics.push(Diagnostic::spanned(
355                        generics.persistence_args_span().unwrap_or(generics_span),
356                        Level::Error,
357                        format!(
358                            "`{}` should have {} persistence lifetime arguments, actually has {}.",
359                            op_constraints.name,
360                            op_constraints.persistence_args.human_string(),
361                            generics.persistence_args.len()
362                        ),
363                    ));
364                }
365                if !op_constraints.type_args.contains(&generics.type_args.len()) {
366                    diagnostics.push(Diagnostic::spanned(
367                        generics.type_args_span().unwrap_or(generics_span),
368                        Level::Error,
369                        format!(
370                            "`{}` should have {} generic type arguments, actually has {}.",
371                            op_constraints.name,
372                            op_constraints.type_args.human_string(),
373                            generics.type_args.len()
374                        ),
375                    ));
376                }
377            }
378
379            op_insts.push((
380                node_id,
381                OperatorInstance {
382                    op_constraints,
383                    input_ports,
384                    output_ports,
385                    singletons_referenced: operator.singletons_referenced.clone(),
386                    generics,
387                    arguments_pre: operator.args.clone(),
388                    arguments_raw: operator.args_raw.clone(),
389                },
390            ));
391        }
392
393        for (node_id, op_inst) in op_insts {
394            self.insert_node_op_inst(node_id, op_inst);
395        }
396
397        // Replace pseudo-operator nodes with GraphNode::Handoff.
398        for (node_id, kind, span) in handoff_nodes {
399            self.nodes[node_id] = GraphNode::Handoff {
400                kind,
401                src_span: span,
402                dst_span: span,
403            };
404        }
405    }
406
407    /// Inserts a node between two existing nodes connected by the given `edge_id`.
408    ///
409    /// `edge`: (src, dst, dst_idx)
410    ///
411    /// Before: A (src) ------------> B (dst)
412    /// After:  A (src) -> X (new) -> B (dst)
413    ///
414    /// Returns the ID of X & ID of edge OUT of X.
415    ///
416    /// Note that both the edges will be new and `edge_id` will be removed. Both new edges will
417    /// get the edge type of the original edge.
418    pub fn insert_intermediate_node(
419        &mut self,
420        edge_id: GraphEdgeId,
421        new_node: GraphNode,
422    ) -> (GraphNodeId, GraphEdgeId) {
423        let span = Some(new_node.span());
424
425        // Make corresponding operator instance (if `node` is an operator).
426        let op_inst_opt = 'oc: {
427            let GraphNode::Operator(operator) = &new_node else {
428                break 'oc None;
429            };
430            let Some(op_constraints) = find_op_op_constraints(operator) else {
431                break 'oc None;
432            };
433            let (input_port, output_port) = self.ports.get(edge_id).cloned().unwrap();
434
435            let mut dummy_diagnostics = Diagnostics::new();
436            let generics = get_operator_generics(&mut dummy_diagnostics, operator);
437            assert!(dummy_diagnostics.is_empty());
438
439            Some(OperatorInstance {
440                op_constraints,
441                input_ports: vec![input_port],
442                output_ports: vec![output_port],
443                singletons_referenced: operator.singletons_referenced.clone(),
444                generics,
445                arguments_pre: operator.args.clone(),
446                arguments_raw: operator.args_raw.clone(),
447            })
448        };
449
450        // Insert new `node`.
451        let node_id = self.nodes.insert(new_node);
452        // Insert corresponding `OperatorInstance` if applicable.
453        if let Some(op_inst) = op_inst_opt {
454            self.operator_instances.insert(node_id, op_inst);
455        }
456        // Update edges to insert node within `edge_id`.
457        let (e0, e1) = self
458            .graph
459            .insert_intermediate_vertex(node_id, edge_id)
460            .unwrap();
461
462        // Update corresponding ports.
463        let (src_idx, dst_idx) = self.ports.remove(edge_id).unwrap();
464        self.ports
465            .insert(e0, (src_idx, PortIndexValue::Elided(span)));
466        self.ports
467            .insert(e1, (PortIndexValue::Elided(span), dst_idx));
468
469        (node_id, e1)
470    }
471
472    /// Remove the node `node_id` but preserves and connects the single predecessor and single successor.
473    /// Panics if the node does not have exactly one predecessor and one successor, or is not in the graph.
474    pub fn remove_intermediate_node(&mut self, node_id: GraphNodeId) {
475        assert_eq!(
476            1,
477            self.node_degree_in(node_id),
478            "Removed intermediate node must have one predecessor"
479        );
480        assert_eq!(
481            1,
482            self.node_degree_out(node_id),
483            "Removed intermediate node must have one successor"
484        );
485        assert!(
486            self.node_subgraph.is_empty() && self.subgraph_nodes.is_empty(),
487            "Should not remove intermediate node after subgraph partitioning"
488        );
489
490        assert!(self.nodes.remove(node_id).is_some());
491        let (new_edge_id, (pred_edge_id, succ_edge_id)) =
492            self.graph.remove_intermediate_vertex(node_id).unwrap();
493        self.operator_instances.remove(node_id);
494        self.node_varnames.remove(node_id);
495
496        let (src_port, _) = self.ports.remove(pred_edge_id).unwrap();
497        let (_, dst_port) = self.ports.remove(succ_edge_id).unwrap();
498        self.ports.insert(new_edge_id, (src_port, dst_port));
499    }
500
501    /// Helper method: determine the "color" (pull vs push) of a node based on its in and out degree,
502    /// excluding reference edges. If linear (1 in, 1 out), color is `None`, indicating it can be
503    /// either push or pull.
504    ///
505    /// Note that this does NOT consider `DelayType` barriers (which generally implies `Pull`).
506    pub(crate) fn node_color(&self, node_id: GraphNodeId) -> Option<Color> {
507        if matches!(self.node(node_id), GraphNode::Handoff { .. }) {
508            return Some(Color::Hoff);
509        }
510
511        // TODO(shadaj): this is a horrible hack
512        if let GraphNode::Operator(op) = self.node(node_id)
513            && (op.name_string() == "resolve_futures_blocking"
514                || op.name_string() == "resolve_futures_blocking_ordered")
515        {
516            return Some(Color::Push);
517        }
518
519        // In-degree, excluding ref-edges.
520        let inn_degree = self.node_predecessor_nodes(node_id).len();
521        // Out-degree excluding ref-edges.
522        let out_degree = self.node_successor_nodes(node_id).len();
523
524        match (inn_degree, out_degree) {
525            (0, 0) => None, // Generally should not happen, "Degenerate subgraph detected".
526            (0, 1) => Some(Color::Pull),
527            (1, 0) => Some(Color::Push),
528            (1, 1) => None, // Linear, can be either push or pull.
529            (_many, 0 | 1) => Some(Color::Pull),
530            (0 | 1, _many) => Some(Color::Push),
531            (_many, _to_many) => Some(Color::Comp),
532        }
533    }
534
535    /// Set the operator tag (for debugging/tracing).
536    pub fn set_operator_tag(&mut self, node_id: GraphNodeId, tag: String) {
537        self.operator_tag.insert(node_id, tag);
538    }
539}
540
541/// Handoff references.
542impl DfirGraph {
543    /// Set the handoff references for the `node_id` operator. Each reference corresponds to the
544    /// same index in the [`crate::parse::Operator::singletons_referenced`] vec.
545    pub fn set_node_handoff_references(
546        &mut self,
547        node_id: GraphNodeId,
548        singletons_referenced: Vec<ResolvedHandoffRef>,
549    ) -> Option<Vec<ResolvedHandoffRef>> {
550        self.node_handoff_references
551            .insert(node_id, singletons_referenced)
552    }
553
554    /// Gets the handoff references for a node. Returns an empty slice for non-operators and
555    /// operators that do not reference handoffs.
556    pub fn node_handoff_references(&self, node_id: GraphNodeId) -> &[ResolvedHandoffRef] {
557        self.node_handoff_references
558            .get(node_id)
559            .map(std::ops::Deref::deref)
560            .unwrap_or_default()
561    }
562
563    /// Collect all refs, grouped by the handoff they're pointing at, then by the access group idx `Option<u32>`.
564    pub fn node_handoff_reference_groups(&self) -> NodeHandoffReferenceGroups<'_> {
565        let mut handoff_references = NodeHandoffReferenceGroups::new();
566        for node_id in self.node_ids() {
567            if let GraphNode::Operator(operator) = self.node(node_id) {
568                let resolved = self.node_handoff_references(node_id);
569                for (resolved_ref, ref_token) in
570                    resolved.iter().zip(operator.singletons_referenced.iter())
571                {
572                    if let Some(target_nid) = resolved_ref.node_id {
573                        handoff_references
574                            .entry(target_nid)
575                            .or_default()
576                            .entry(resolved_ref.access_group)
577                            .or_default()
578                            .push((node_id, resolved_ref, ref_token.span()));
579                    }
580                }
581            }
582        }
583        handoff_references
584    }
585}
586
587/// Per-node handoff references, in turn grouped by access group.
588/// Map: handoff_node_id -> access_group -> (source `GraphNodeId`, `ResolvedHandoffRef`, `#ref` span)
589pub type NodeHandoffReferenceGroups<'a> =
590    BTreeMap<GraphNodeId, BTreeMap<Option<u32>, Vec<(GraphNodeId, &'a ResolvedHandoffRef, Span)>>>;
591
592/// Module methods.
593impl DfirGraph {
594    /// When modules are imported into a flat graph, they come with an input and output ModuleBoundary node.
595    /// The partitioner doesn't understand these nodes and will panic if it encounters them.
596    /// merge_modules removes them from the graph, stitching the input and ouput sides of the ModuleBondaries based on their ports
597    /// For example:
598    ///     source_iter([]) -> \[myport\]ModuleBoundary(input)\[my_port\] -> map(|x| x) -> ModuleBoundary(output) -> null();
599    /// in the above eaxmple, the \[myport\] port will be used to connect the source_iter with the map that is inside of the module.
600    /// The output module boundary has elided ports, this is also used to match up the input/output across the module boundary.
601    pub fn merge_modules(&mut self) -> Result<(), Diagnostic> {
602        let mod_bound_nodes = self
603            .nodes()
604            .filter(|(_nid, node)| matches!(node, GraphNode::ModuleBoundary { .. }))
605            .map(|(nid, _node)| nid)
606            .collect::<Vec<_>>();
607
608        for mod_bound_node in mod_bound_nodes {
609            self.remove_module_boundary(mod_bound_node)?;
610        }
611
612        Ok(())
613    }
614
615    /// see `merge_modules`
616    /// This function removes a singular module boundary from the graph and performs the necessary stitching to fix the graph afterward.
617    /// `merge_modules` calls this function for each module boundary in the graph.
618    fn remove_module_boundary(&mut self, mod_bound_node: GraphNodeId) -> Result<(), Diagnostic> {
619        assert!(
620            self.node_subgraph.is_empty() && self.subgraph_nodes.is_empty(),
621            "Should not remove intermediate node after subgraph partitioning"
622        );
623
624        let mut mod_pred_ports = BTreeMap::new();
625        let mut mod_succ_ports = BTreeMap::new();
626
627        for mod_out_edge in self.node_predecessor_edges(mod_bound_node) {
628            let (pred_port, succ_port) = self.edge_ports(mod_out_edge);
629            mod_pred_ports.insert(succ_port.clone(), (mod_out_edge, pred_port.clone()));
630        }
631
632        for mod_inn_edge in self.node_successor_edges(mod_bound_node) {
633            let (pred_port, succ_port) = self.edge_ports(mod_inn_edge);
634            mod_succ_ports.insert(pred_port.clone(), (mod_inn_edge, succ_port.clone()));
635        }
636
637        if mod_pred_ports.keys().collect::<BTreeSet<_>>()
638            != mod_succ_ports.keys().collect::<BTreeSet<_>>()
639        {
640            // get module boundary node
641            let GraphNode::ModuleBoundary { input, import_expr } = self.node(mod_bound_node) else {
642                panic!();
643            };
644
645            if *input {
646                return Err(Diagnostic {
647                    span: *import_expr,
648                    level: Level::Error,
649                    message: format!(
650                        "The ports into the module did not match. input: {:?}, expected: {:?}",
651                        mod_pred_ports.keys().map(|x| x.to_string()).join(", "),
652                        mod_succ_ports.keys().map(|x| x.to_string()).join(", ")
653                    ),
654                });
655            } else {
656                return Err(Diagnostic {
657                    span: *import_expr,
658                    level: Level::Error,
659                    message: format!(
660                        "The ports out of the module did not match. output: {:?}, expected: {:?}",
661                        mod_succ_ports.keys().map(|x| x.to_string()).join(", "),
662                        mod_pred_ports.keys().map(|x| x.to_string()).join(", "),
663                    ),
664                });
665            }
666        }
667
668        for (port, (pred_edge, pred_port)) in mod_pred_ports {
669            let (succ_edge, succ_port) = mod_succ_ports.remove(&port).unwrap();
670
671            let (src, _) = self.edge(pred_edge);
672            let (_, dst) = self.edge(succ_edge);
673            self.remove_edge(pred_edge);
674            self.remove_edge(succ_edge);
675
676            let new_edge_id = self.graph.insert_edge(src, dst);
677            self.ports.insert(new_edge_id, (pred_port, succ_port));
678        }
679
680        self.graph.remove_vertex(mod_bound_node);
681        self.nodes.remove(mod_bound_node);
682
683        Ok(())
684    }
685}
686
687/// Edge methods.
688impl DfirGraph {
689    /// Get the `src` and `dst` for an edge: `(src GraphNodeId, dst GraphNodeId)`.
690    pub fn edge(&self, edge_id: GraphEdgeId) -> (GraphNodeId, GraphNodeId) {
691        let (src, dst) = self.graph.edge(edge_id).expect("Edge not found.");
692        (src, dst)
693    }
694
695    /// Get the source and destination ports for an edge: `(src &PortIndexValue, dst &PortIndexValue)`.
696    pub fn edge_ports(&self, edge_id: GraphEdgeId) -> (&PortIndexValue, &PortIndexValue) {
697        let (src_port, dst_port) = self.ports.get(edge_id).expect("Edge not found.");
698        (src_port, dst_port)
699    }
700
701    /// Iterator of all edge IDs `GraphEdgeId`.
702    pub fn edge_ids(&self) -> slotmap::basic::Keys<'_, GraphEdgeId, (GraphNodeId, GraphNodeId)> {
703        self.graph.edge_ids()
704    }
705
706    /// Iterator over all edges: `(GraphEdgeId, (src GraphNodeId, dst GraphNodeId))`.
707    pub fn edges(
708        &self,
709    ) -> impl '_
710    + ExactSizeIterator<Item = (GraphEdgeId, (GraphNodeId, GraphNodeId))>
711    + FusedIterator
712    + Clone
713    + Debug {
714        self.graph.edges()
715    }
716
717    /// Insert an edge between nodes thru the given ports.
718    pub fn insert_edge(
719        &mut self,
720        src: GraphNodeId,
721        src_port: PortIndexValue,
722        dst: GraphNodeId,
723        dst_port: PortIndexValue,
724    ) -> GraphEdgeId {
725        let edge_id = self.graph.insert_edge(src, dst);
726        self.ports.insert(edge_id, (src_port, dst_port));
727        edge_id
728    }
729
730    /// Removes an edge and its corresponding ports and edge type info.
731    pub fn remove_edge(&mut self, edge: GraphEdgeId) {
732        let (_src, _dst) = self.graph.remove_edge(edge).unwrap();
733        let (_src_port, _dst_port) = self.ports.remove(edge).unwrap();
734    }
735}
736
737/// Subgraph methods.
738impl DfirGraph {
739    /// Nodes belonging to the given subgraph.
740    pub fn subgraph(&self, subgraph_id: GraphSubgraphId) -> &Vec<GraphNodeId> {
741        self.subgraph_nodes
742            .get(subgraph_id)
743            .expect("Subgraph not found.")
744    }
745
746    /// Iterator over all subgraph IDs.
747    pub fn subgraph_ids(&self) -> slotmap::basic::Keys<'_, GraphSubgraphId, Vec<GraphNodeId>> {
748        self.subgraph_nodes.keys()
749    }
750
751    /// Subgraph IDs in topological sort order.
752    pub fn subgraph_toposort(&self) -> &[GraphSubgraphId] {
753        &self.subgraph_toposort
754    }
755
756    /// Set the topological sort order for subgraphs.
757    pub fn set_subgraph_toposort(&mut self, order: Vec<GraphSubgraphId>) {
758        self.subgraph_toposort = order;
759    }
760
761    /// Iterator over all subgraphs, ID and members: `(GraphSubgraphId, Vec<GraphNodeId>)`.
762    pub fn subgraphs(&self) -> slotmap::basic::Iter<'_, GraphSubgraphId, Vec<GraphNodeId>> {
763        self.subgraph_nodes.iter()
764    }
765
766    /// Create a subgraph consisting of `node_ids`. Returns an error if any of the nodes are already in a subgraph.
767    pub fn insert_subgraph(
768        &mut self,
769        node_ids: Vec<GraphNodeId>,
770    ) -> Result<GraphSubgraphId, (GraphNodeId, GraphSubgraphId)> {
771        // Check none are already in subgraphs
772        for &node_id in node_ids.iter() {
773            if let Some(&old_sg_id) = self.node_subgraph.get(node_id) {
774                return Err((node_id, old_sg_id));
775            }
776        }
777        let subgraph_id = self.subgraph_nodes.insert_with_key(|sg_id| {
778            for &node_id in node_ids.iter() {
779                self.node_subgraph.insert(node_id, sg_id);
780            }
781            node_ids
782        });
783
784        Ok(subgraph_id)
785    }
786
787    /// Removes a node from its subgraph. Returns true if the node was in a subgraph.
788    pub fn remove_from_subgraph(&mut self, node_id: GraphNodeId) -> bool {
789        if let Some(old_sg_id) = self.node_subgraph.remove(node_id) {
790            self.subgraph_nodes[old_sg_id].retain(|&other_node_id| other_node_id != node_id);
791            true
792        } else {
793            false
794        }
795    }
796
797    /// Gets the delay type for a handoff node, if set.
798    pub fn handoff_delay_type(&self, node_id: GraphNodeId) -> Option<DelayType> {
799        self.handoff_delay_type.get(node_id).copied()
800    }
801
802    /// Sets the delay type for a handoff node.
803    pub fn set_handoff_delay_type(&mut self, node_id: GraphNodeId, delay_type: DelayType) {
804        self.handoff_delay_type.insert(node_id, delay_type);
805    }
806
807    /// Helper: finds the first index in `subgraph_nodes` where it transitions from pull to push.
808    fn find_pull_to_push_idx(&self, subgraph_nodes: &[GraphNodeId]) -> usize {
809        subgraph_nodes
810            .iter()
811            .position(|&node_id| {
812                self.node_color(node_id)
813                    .is_some_and(|color| Color::Pull != color)
814            })
815            .unwrap_or(subgraph_nodes.len())
816    }
817}
818
819/// Display/output methods.
820impl DfirGraph {
821    /// Helper to generate a deterministic `Ident` for the given node.
822    fn node_as_ident(&self, node_id: GraphNodeId, is_pred: bool) -> Ident {
823        let name = match &self.nodes[node_id] {
824            GraphNode::Operator(_) => format!("op_{:?}", node_id.data()),
825            GraphNode::Handoff {
826                kind: HandoffKind::Vec,
827                ..
828            } => format!(
829                "hoff_{:?}_{}",
830                node_id.data(),
831                if is_pred { "recv" } else { "send" }
832            ),
833            GraphNode::Handoff {
834                kind: HandoffKind::Singleton | HandoffKind::Optional,
835                ..
836            } => format!(
837                "singleton_{:?}_{}",
838                node_id.data(),
839                if is_pred { "recv" } else { "send" }
840            ),
841            GraphNode::ModuleBoundary { .. } => panic!(),
842        };
843        let span = match (is_pred, &self.nodes[node_id]) {
844            (_, GraphNode::Operator(operator)) => operator.span(),
845            (true, &GraphNode::Handoff { src_span, .. }) => src_span,
846            (false, &GraphNode::Handoff { dst_span, .. }) => dst_span,
847            (_, GraphNode::ModuleBoundary { .. }) => panic!(),
848        };
849        Ident::new(&name, span)
850    }
851
852    /// Helper to generate the main buffer `Ident` for a handoff node.
853    fn hoff_buf_ident(&self, hoff_id: GraphNodeId, span: Span) -> Ident {
854        Ident::new(&format!("hoff_{:?}_buf", hoff_id.data()), span)
855    }
856
857    /// Helper to generate the back (double-buffer) `Ident` for a handoff node.
858    fn hoff_back_ident(&self, hoff_id: GraphNodeId, span: Span) -> Ident {
859        Ident::new(&format!("hoff_{:?}_back", hoff_id.data()), span)
860    }
861
862    /// Resolve the handoff references via [`Self::node_handoff_references`] for the given `node_id`.
863    /// Returns token streams for each reference:
864    /// - For HandoffKind::Singleton: `buf.as_ref().unwrap()` (shared, `&T`) or
865    ///   `buf.as_mut().unwrap()` (mutable, `&mut T`)
866    /// - For HandoffKind::Optional: `&buf` (shared, `&Option<T>`) or
867    ///   `&mut buf` (mutable, `&mut Option<T>`)
868    /// - For HandoffKind::Vec: `&buf` (shared, `&Vec<T>`) or
869    ///   `&mut buf` (mutable, `&mut Vec<T>`)
870    fn helper_resolve_singletons(&self, node_id: GraphNodeId, span: Span) -> Vec<TokenStream> {
871        self.node_handoff_references(node_id)
872            .iter()
873            .map(|resolved_ref| {
874                // TODO(mingwei): this `expect` should be caught in error checking
875                let ref_node_id = resolved_ref
876                    .node_id
877                    .expect("Expected singleton to be resolved but was not, this is a bug.");
878                let is_mut = resolved_ref.is_mut;
879                match self.node(ref_node_id) {
880                    GraphNode::Handoff {
881                        kind: HandoffKind::Singleton,
882                        ..
883                    } => {
884                        let buf_ident = self.hoff_buf_ident(ref_node_id, span);
885                        if is_mut {
886                            quote_spanned! {span=> #buf_ident.as_mut().unwrap() }
887                        } else {
888                            quote_spanned! {span=> #buf_ident.as_ref().unwrap() }
889                        }
890                    }
891                    GraphNode::Handoff {
892                        kind: HandoffKind::Optional | HandoffKind::Vec,
893                        ..
894                    } => {
895                        let buf_ident = self.hoff_buf_ident(ref_node_id, span);
896                        if is_mut {
897                            quote_spanned! {span=> &mut #buf_ident }
898                        } else {
899                            quote_spanned! {span=> &#buf_ident }
900                        }
901                    }
902                    _ => {
903                        unreachable!("Only handoff nodes should be reachable as handoff references")
904                    }
905                }
906            })
907            .collect::<Vec<_>>()
908    }
909
910    /// Returns each subgraph's receive and send handoffs.
911    /// `Map<GraphSubgraphId, (recv handoffs, send handoffs)>`
912    fn helper_collect_subgraph_handoffs(
913        &self,
914    ) -> SecondaryMap<GraphSubgraphId, (Vec<GraphNodeId>, Vec<GraphNodeId>)> {
915        // Get data on handoff src and dst subgraphs.
916        let mut subgraph_handoffs: SecondaryMap<
917            GraphSubgraphId,
918            (Vec<GraphNodeId>, Vec<GraphNodeId>),
919        > = self
920            .subgraph_nodes
921            .keys()
922            .map(|k| (k, Default::default()))
923            .collect();
924
925        // For each handoff/singleton node, add it to the `send`/`recv` lists for the corresponding subgraphs.
926        for (hoff_id, hoff) in self.nodes() {
927            if !matches!(hoff, GraphNode::Handoff { .. }) {
928                continue;
929            }
930            // Receivers from the handoff. (Should really only be one).
931            for (_edge, succ_id) in self.node_successors(hoff_id) {
932                let succ_sg = self
933                    .node_subgraph(succ_id)
934                    .expect("bug: successor not in subgraph, may be a doubled/adjacent handoff");
935                subgraph_handoffs[succ_sg].0.push(hoff_id);
936            }
937            // Senders into the handoff. (Should really only be one).
938            for (_edge, pred_id) in self.node_predecessors(hoff_id) {
939                let pred_sg = self
940                    .node_subgraph(pred_id)
941                    .expect("bug: predecessor not in subgraph, may be a doubled/adjacent handoff");
942                subgraph_handoffs[pred_sg].1.push(hoff_id);
943            }
944        }
945
946        subgraph_handoffs
947    }
948
949    /// Emit this graph as runnable Rust source code tokens that execute inline.
950    /// Generates a flat `async move |df: &mut Context|` closure where subgraph
951    /// blocks are inlined in topological order, using local `Vec<T>` buffers
952    /// instead of runtime handoffs. Each call to the closure runs one tick.
953    ///
954    /// The generated code block evaluates to a `Dfir` instance wrapping the
955    /// closure. Operator prologues run at construction time on the `Context`
956    /// before it is moved into `Dfir::new`. `Dfir` provides the `Context`
957    /// to the closure on each tick run.
958    ///
959    /// # Errors
960    ///
961    /// Returns all diagnostics as `Err(diagnostics)` if any are errors
962    /// (leaving `&mut diagnostics` empty).
963    pub fn as_code(
964        &self,
965        root: &TokenStream,
966        include_type_guards: bool,
967        prefix: TokenStream,
968        diagnostics: &mut Diagnostics,
969    ) -> Result<TokenStream, Diagnostics> {
970        self.as_code_with_options(root, include_type_guards, true, prefix, diagnostics)
971    }
972
973    /// Like [`Self::as_code`], but with `include_meta` controlling whether
974    /// the runtime meta graph + diagnostics JSON blobs are baked into the
975    /// generated `Dfir::new(...)` call.
976    ///
977    /// The simulator calls Dfir::new() on each iteration, and as a part of that
978    /// it does parsing of the metagraph and diganostics blob. One of them causes spans to get allocated,
979    /// each time a span is allocated, some threadlocal u32 is being incremented, and, on a long simulator run,
980    /// the u32 overflows and panics.
981    pub fn as_code_with_options(
982        &self,
983        root: &TokenStream,
984        include_type_guards: bool,
985        include_meta: bool,
986        prefix: TokenStream,
987        diagnostics: &mut Diagnostics,
988    ) -> Result<TokenStream, Diagnostics> {
989        let df = Ident::new(GRAPH, Span::call_site());
990        let context = Ident::new(CONTEXT, Span::call_site());
991        // Tick-local bump-allocated Vec handoff declarations (inside the tick closure).
992        let bump_ident = Ident::new("__dfir_bump", Span::call_site());
993
994        // 1. Collect all handoff nodes.
995        let handoff_nodes = self
996            .nodes
997            .iter()
998            .filter_map(|(node_id, node)| match node {
999                &GraphNode::Handoff {
1000                    kind,
1001                    src_span,
1002                    dst_span,
1003                } => Some((node_id, kind, (src_span, dst_span))),
1004                GraphNode::Operator(_) => None,
1005                GraphNode::ModuleBoundary { .. } => panic!(),
1006            })
1007            .collect::<Vec<_>>();
1008
1009        // Determine which handoff nodes are tick-boundary (defer_tick) back-edges.
1010        // These must remain as captured Vec<T> since they persist across ticks.
1011        // All other Vec handoffs will be bump-allocated (tick-local).
1012        let back_edge_hoffs_and_lazyness = handoff_nodes
1013            .iter()
1014            .map(|&(node_id, _, _)| node_id)
1015            .filter_map(|node_id| {
1016                if let Some(delay_type) = self.handoff_delay_type(node_id) {
1017                    assert!(
1018                        matches!(delay_type, DelayType::Tick | DelayType::TickLazy),
1019                        "Handoff `DelayType` must be either `Tick` or `TickLazy` (or unset)."
1020                    );
1021                    Some((node_id, matches!(delay_type, DelayType::TickLazy)))
1022                } else {
1023                    None
1024                }
1025            })
1026            .collect::<SparseSecondaryMap<_, _>>();
1027
1028        // Back buffer idents, buf idents, and if they are lazy.
1029        let back_buffer_idents_laziness = handoff_nodes
1030            .iter()
1031            .filter_map(|&(hoff_id, _kind, (src_span, dst_span))| {
1032                back_edge_hoffs_and_lazyness.get(hoff_id).map(|&is_lazy| {
1033                    let span = src_span.join(dst_span).unwrap_or(src_span);
1034                    let back_ident = self.hoff_back_ident(hoff_id, span);
1035                    let buf_ident = self.hoff_buf_ident(hoff_id, span);
1036                    (back_ident, buf_ident, is_lazy)
1037                })
1038            })
1039            .collect::<Vec<_>>();
1040
1041        // Generate swap code for tick-boundary (defer_tick / defer_tick_lazy) handoffs.
1042        // At the end of each tick, swap the regular buffer and back buffer so the
1043        // consumer reads last tick's data from the back buffer.
1044        let back_edge_swap_code = handoff_nodes
1045            .iter()
1046            .filter(|&&(node_id, _kind, _)| back_edge_hoffs_and_lazyness.contains_key(node_id))
1047            .map(|&(hoff_id, _kind, _)| {
1048                let span = self.nodes[hoff_id].span();
1049                let buf_ident = self.hoff_buf_ident(hoff_id, span);
1050                let back_ident = self.hoff_back_ident(hoff_id, span);
1051                quote_spanned! {span=>
1052                    ::std::mem::swap(&mut #buf_ident, &mut #back_ident);
1053                }
1054            })
1055            .collect::<Vec<_>>();
1056
1057        // 2. Collect per-subgraph recv & send handoffs.
1058        let subgraph_handoffs = self.helper_collect_subgraph_handoffs();
1059
1060        // 3. Use pre-computed subgraph topological order.
1061        let all_subgraphs: Vec<_> = self
1062            .subgraph_toposort()
1063            .iter()
1064            .map(|&sg_id| (sg_id, self.subgraph(sg_id)))
1065            .collect();
1066
1067        // TODO(mingwei): If a handoff has no pipe consumers we should drop it as soon as possible, after all reference
1068        // consumers. Right now we just let these handoffs die at the end of the tick.
1069
1070        let mut op_prologue_code = Vec::new();
1071        let mut op_tick_end_code = Vec::new();
1072        let mut subgraph_blocks = Vec::new();
1073        {
1074            for &(subgraph_id, subgraph_nodes) in all_subgraphs.iter() {
1075                let sg_metrics_ffi = subgraph_id.data().as_ffi();
1076                let (recv_hoffs, send_hoffs) = &subgraph_handoffs[subgraph_id];
1077
1078                // Generate buffer ident helpers for this subgraph's handoffs.
1079                let recv_port_idents: Vec<Ident> = recv_hoffs
1080                    .iter()
1081                    .map(|&hoff_id| self.node_as_ident(hoff_id, true))
1082                    .collect();
1083                let send_port_idents: Vec<Ident> = send_hoffs
1084                    .iter()
1085                    .map(|&hoff_id| self.node_as_ident(hoff_id, false))
1086                    .collect();
1087
1088                // Map handoff node IDs to buffer idents.
1089                let recv_buf_idents: Vec<Ident> = recv_hoffs
1090                    .iter()
1091                    .map(|&hoff_id| self.hoff_buf_ident(hoff_id, self.nodes[hoff_id].span()))
1092                    .collect();
1093                let send_buf_idents: Vec<Ident> = send_hoffs
1094                    .iter()
1095                    .map(|&hoff_id| self.hoff_buf_ident(hoff_id, self.nodes[hoff_id].span()))
1096                    .collect();
1097
1098                // Handoff kinds
1099                let recv_kinds = recv_hoffs
1100                    .iter()
1101                    .map(|&hoff_id| {
1102                        let GraphNode::Handoff { kind, .. } = self.node(hoff_id) else {
1103                            panic!()
1104                        };
1105                        *kind
1106                    })
1107                    .collect::<Vec<_>>();
1108                let send_kinds = send_hoffs
1109                    .iter()
1110                    .map(|&hoff_id| {
1111                        let GraphNode::Handoff { kind, .. } = self.node(hoff_id) else {
1112                            panic!()
1113                        };
1114                        *kind
1115                    })
1116                    .collect::<Vec<_>>();
1117
1118                // Recv port code: drain from buffer into iterator, tracking if non-empty.
1119                // For back-edge (defer_tick) handoffs, drain from the back buffer instead.
1120                // Also update handoff metrics (measured at recv, not send — see graph.rs).
1121                let recv_port_code: Vec<TokenStream> = recv_port_idents
1122                    .iter()
1123                    .zip(recv_buf_idents.iter())
1124                    .zip(recv_kinds.iter())
1125                    .zip(recv_hoffs.iter())
1126                    .map(|(((port_ident, buf_ident), &kind), &hoff_id)| {
1127                        let hoff_ffi = hoff_id.data().as_ffi();
1128                        // Use call_site span for internal identifiers to avoid
1129                        // hygiene issues when invoked through declarative macros
1130                        // (e.g. dfir_expect_warnings!). TODO(#2781): define these once.
1131                        let work_done = Ident::new("__dfir_work_done", Span::call_site());
1132                        let metrics = Ident::new("__dfir_metrics", Span::call_site());
1133
1134                        // Compute len and drain expressions based on handoff kind.
1135                        let (len_expr, drain_expr) = match kind {
1136                            HandoffKind::Singleton | HandoffKind::Optional => (
1137                                quote! { if #buf_ident.is_some() { 1usize } else { 0usize } },
1138                                quote! { #root::dfir_pipes::pull::iter(#buf_ident.take().into_iter()) },
1139                            ),
1140                            HandoffKind::Vec => {
1141                                // Special asymmetric handling for defer tick handoffs, which are double-buffered.
1142                                // The producer writes to the regular buffer; at end-of-tick the buffers are swapped,
1143                                // so the consumer drains from the back buffer (here).
1144                                let drain_ident = if back_edge_hoffs_and_lazyness.contains_key(hoff_id) {
1145                                    &self.hoff_back_ident(hoff_id, buf_ident.span())
1146                                } else {
1147                                    buf_ident
1148                                };
1149                                (
1150                                    quote! { #drain_ident.len() },
1151                                    quote! { #root::dfir_pipes::pull::iter(#drain_ident.drain(..)) },
1152                                )
1153                            }
1154                        };
1155
1156                        quote_spanned! {port_ident.span()=>
1157                            {
1158                                let hoff_len = #len_expr;
1159                                if hoff_len > 0 {
1160                                    #work_done = true;
1161                                }
1162                                let hoff_metrics = &#metrics.handoffs[
1163                                    #root::slotmap::KeyData::from_ffi(#hoff_ffi).into()
1164                                ];
1165                                hoff_metrics.total_items_count.update(|x| x + hoff_len);
1166                                hoff_metrics.curr_items_count.set(hoff_len);
1167                            }
1168                            let #port_ident = #drain_expr;
1169                        }
1170                    })
1171                    .collect();
1172
1173                // Send port code: push into buffer.
1174                let send_port_code: Vec<TokenStream> = send_port_idents
1175                    .iter()
1176                    .zip(send_buf_idents.iter())
1177                    .zip(send_kinds.iter())
1178                    .map(|((port_ident, buf_ident), &kind)| {
1179                        match kind {
1180                            HandoffKind::Singleton => {
1181                                // Singleton slot: store exactly one item, panic on duplicate.
1182                                quote_spanned! {port_ident.span()=>
1183                                    let #port_ident = #root::dfir_pipes::push::for_each(|__item| {
1184                                        if #buf_ident.replace(__item).is_some() {
1185                                            panic!("singleton() received more than one item");
1186                                        }
1187                                    });
1188                                }
1189                            }
1190                            HandoffKind::Optional => {
1191                                // Optional slot: store at most one item, panic on duplicate.
1192                                quote_spanned! {port_ident.span()=>
1193                                    let #port_ident = #root::dfir_pipes::push::for_each(|__item| {
1194                                        if #buf_ident.replace(__item).is_some() {
1195                                            panic!("optional() received more than one item");
1196                                        }
1197                                    });
1198                                }
1199                            }
1200                            HandoffKind::Vec => {
1201                                quote_spanned! {port_ident.span()=>
1202                                    // TODO(mingwei): use `#root::dfir_pipes::push::vec_push`?
1203                                    let #port_ident = #root::dfir_pipes::push::for_each(|item| { #buf_ident.push(item); });
1204                                }
1205                            }
1206                        }
1207                    })
1208                    .collect();
1209
1210                // All nodes in a subgraph should be in the same loop.
1211                let loop_id = self.node_loop(subgraph_nodes[0]);
1212
1213                let mut subgraph_op_iter_code = Vec::new();
1214                let mut subgraph_op_iter_after_code = Vec::new();
1215                {
1216                    let pull_to_push_idx = self.find_pull_to_push_idx(subgraph_nodes);
1217
1218                    let (pull_half, push_half) = subgraph_nodes.split_at(pull_to_push_idx);
1219                    let nodes_iter = pull_half.iter().chain(push_half.iter().rev());
1220
1221                    for (idx, &node_id) in nodes_iter.enumerate() {
1222                        let node = &self.nodes[node_id];
1223                        assert!(
1224                            matches!(node, GraphNode::Operator(_)),
1225                            "Handoffs are not part of subgraphs."
1226                        );
1227                        let op_inst = &self.operator_instances[node_id];
1228
1229                        let op_span = node.span();
1230                        let op_name = op_inst.op_constraints.name;
1231                        // Use op's span for root. #root is expected to be correct, any errors should span back to the op gen.
1232                        let root = change_spans(root.clone(), op_span);
1233                        let op_constraints = OPERATORS
1234                            .iter()
1235                            .find(|op| op_name == op.name)
1236                            .unwrap_or_else(|| panic!("Failed to find op: {}", op_name));
1237
1238                        let ident = self.node_as_ident(node_id, false);
1239
1240                        {
1241                            // TODO clean this up.
1242                            // Collect input arguments (predecessors).
1243                            let mut input_edges = self
1244                                .graph
1245                                .predecessor_edges(node_id)
1246                                .map(|edge_id| (self.edge_ports(edge_id).1, edge_id))
1247                                .collect::<Vec<_>>();
1248                            // Ensure sorted by port index.
1249                            input_edges.sort();
1250
1251                            let inputs = input_edges
1252                                .iter()
1253                                .map(|&(_port, edge_id)| {
1254                                    let (pred, _) = self.edge(edge_id);
1255                                    self.node_as_ident(pred, true)
1256                                })
1257                                .collect::<Vec<_>>();
1258
1259                            // Collect output arguments (successors).
1260                            let mut output_edges = self
1261                                .graph
1262                                .successor_edges(node_id)
1263                                .map(|edge_id| (&self.ports[edge_id].0, edge_id))
1264                                .collect::<Vec<_>>();
1265                            // Ensure sorted by port index.
1266                            output_edges.sort();
1267
1268                            let outputs = output_edges
1269                                .iter()
1270                                .map(|&(_port, edge_id)| {
1271                                    let (_, succ) = self.edge(edge_id);
1272                                    self.node_as_ident(succ, false)
1273                                })
1274                                .collect::<Vec<_>>();
1275
1276                            let is_pull = idx < pull_to_push_idx;
1277
1278                            // There's a bit of dark magic hidden in `Span`s... you'd think it's just a `file:line:column`,
1279                            // but it has one extra bit of info for _name resolution_, used for `Ident`s. `Span::call_site()`
1280                            // has the (unhygienic) resolution we want, an ident is just solely determined by its string name,
1281                            // which is what you'd expect out of unhygienic proc macros like this. Meanwhile, declarative macros
1282                            // use `Span::mixed_site()` which is weird and I don't understand it. It turns out that if you call
1283                            // the dfir syntax proc macro from _within_ a declarative macro then `op_span` will have the
1284                            // bad `Span::mixed_site()` name resolution and cause "Cannot find value `df/context`" errors. So
1285                            // we call `.resolved_at()` to fix resolution back to `Span::call_site()`. -Mingwei
1286                            let df_local = &Ident::new(GRAPH, op_span.resolved_at(df.span()));
1287                            let context = &Ident::new(CONTEXT, op_span.resolved_at(context.span()));
1288
1289                            let singletons_resolved =
1290                                self.helper_resolve_singletons(node_id, op_span);
1291
1292                            let arguments = &process_singletons::postprocess_singletons(
1293                                op_inst.arguments_raw.clone(),
1294                                singletons_resolved,
1295                            );
1296
1297                            let source_tag = 'a: {
1298                                if let Some(tag) = self.operator_tag.get(node_id).cloned() {
1299                                    break 'a tag;
1300                                }
1301
1302                                if proc_macro::is_available() {
1303                                    let op_span = op_span.unwrap();
1304                                    break 'a format!(
1305                                        "loc_{}_{}_{}_{}_{}",
1306                                        crate::pretty_span::make_source_path_relative(
1307                                            &op_span.file()
1308                                        )
1309                                        .display()
1310                                        .to_string()
1311                                        .replace(|x: char| !x.is_ascii_alphanumeric(), "_"),
1312                                        op_span.start().line(),
1313                                        op_span.start().column(),
1314                                        op_span.end().line(),
1315                                        op_span.end().column(),
1316                                    );
1317                                }
1318
1319                                format!(
1320                                    "loc_nopath_{}_{}_{}_{}",
1321                                    op_span.start().line,
1322                                    op_span.start().column,
1323                                    op_span.end().line,
1324                                    op_span.end().column
1325                                )
1326                            };
1327
1328                            let work_fn = format_ident!(
1329                                "{}__{}__{}",
1330                                ident,
1331                                op_name,
1332                                source_tag,
1333                                span = op_span
1334                            );
1335                            let work_fn_async = format_ident!("{}__async", work_fn, span = op_span);
1336
1337                            let context_args = WriteContextArgs {
1338                                root: &root,
1339                                df_ident: df_local,
1340                                context,
1341                                subgraph_id,
1342                                node_id,
1343                                loop_id,
1344                                op_span,
1345                                op_tag: self.operator_tag.get(node_id).cloned(),
1346                                work_fn: &work_fn,
1347                                work_fn_async: &work_fn_async,
1348                                ident: &ident,
1349                                is_pull,
1350                                inputs: &inputs,
1351                                outputs: &outputs,
1352                                op_name,
1353                                op_inst,
1354                                arguments,
1355                            };
1356
1357                            let write_result =
1358                                (op_constraints.write_fn)(&context_args, diagnostics);
1359                            let OperatorWriteOutput {
1360                                write_prologue,
1361                                write_iterator,
1362                                write_iterator_after,
1363                                write_tick_end,
1364                            } = write_result.unwrap_or_else(|()| {
1365                                assert!(
1366                                    diagnostics.has_error(),
1367                                    "Operator `{}` returned `Err` but emitted no diagnostics, this is a bug.",
1368                                    op_name,
1369                                );
1370                                OperatorWriteOutput {
1371                                    write_iterator: null_write_iterator_fn(&context_args),
1372                                    ..Default::default()
1373                                }
1374                            });
1375
1376                            op_prologue_code.push(syn::parse_quote! {
1377                                #[allow(non_snake_case)]
1378                                #[inline(always)]
1379                                fn #work_fn<T>(thunk: impl ::std::ops::FnOnce() -> T) -> T {
1380                                    thunk()
1381                                }
1382
1383                                #[allow(non_snake_case)]
1384                                #[inline(always)]
1385                                async fn #work_fn_async<T>(
1386                                    thunk: impl ::std::future::Future<Output = T>,
1387                                ) -> T {
1388                                    thunk.await
1389                                }
1390                            });
1391                            op_prologue_code.push(write_prologue);
1392                            op_tick_end_code.push(write_tick_end);
1393                            subgraph_op_iter_code.push(write_iterator);
1394
1395                            if include_type_guards {
1396                                let type_guard = if is_pull {
1397                                    quote_spanned! {op_span=>
1398                                        let #ident = {
1399                                            #[allow(non_snake_case)]
1400                                            #[inline(always)]
1401                                            pub fn #work_fn<Item, Input>(input: Input)
1402                                                -> impl #root::dfir_pipes::pull::Pull<Item = Item, Meta = (), CanPend = Input::CanPend, CanEnd = Input::CanEnd>
1403                                            where
1404                                                Input: #root::dfir_pipes::pull::Pull<Item = Item, Meta = ()>,
1405                                            {
1406                                                #root::pin_project_lite::pin_project! {
1407                                                    #[repr(transparent)]
1408                                                    struct Pull<Item, Input: #root::dfir_pipes::pull::Pull<Item = Item>> {
1409                                                        #[pin]
1410                                                        inner: Input
1411                                                    }
1412                                                }
1413
1414                                                impl<Item, Input> #root::dfir_pipes::pull::Pull for Pull<Item, Input>
1415                                                where
1416                                                    Input: #root::dfir_pipes::pull::Pull<Item = Item>,
1417                                                {
1418                                                    type Ctx<'ctx> = Input::Ctx<'ctx>;
1419
1420                                                    type Item = Item;
1421                                                    type Meta = Input::Meta;
1422                                                    type CanPend = Input::CanPend;
1423                                                    type CanEnd = Input::CanEnd;
1424
1425                                                    #[inline(always)]
1426                                                    fn pull(
1427                                                        self: ::std::pin::Pin<&mut Self>,
1428                                                        ctx: &mut Self::Ctx<'_>,
1429                                                    ) -> #root::dfir_pipes::pull::PullStep<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
1430                                                        #root::dfir_pipes::pull::Pull::pull(self.project().inner, ctx)
1431                                                    }
1432
1433                                                    #[inline(always)]
1434                                                    fn size_hint(&self) -> (usize, Option<usize>) {
1435                                                        #root::dfir_pipes::pull::Pull::size_hint(&self.inner)
1436                                                    }
1437                                                }
1438
1439                                                Pull {
1440                                                    inner: input
1441                                                }
1442                                            }
1443                                            #work_fn::<_, _>( #ident )
1444                                        };
1445                                    }
1446                                } else {
1447                                    quote_spanned! {op_span=>
1448                                        let #ident = {
1449                                            #[allow(non_snake_case)]
1450                                            #[inline(always)]
1451                                            pub fn #work_fn<Item, Psh>(psh: Psh) -> impl #root::dfir_pipes::push::Push<Item, (), CanPend = Psh::CanPend>
1452                                            where
1453                                                Psh: #root::dfir_pipes::push::Push<Item, ()>
1454                                            {
1455                                                #root::pin_project_lite::pin_project! {
1456                                                    #[repr(transparent)]
1457                                                    struct PushGuard<Psh> {
1458                                                        #[pin]
1459                                                        inner: Psh,
1460                                                    }
1461                                                }
1462
1463                                                impl<Item, Psh> #root::dfir_pipes::push::Push<Item, ()> for PushGuard<Psh>
1464                                                where
1465                                                    Psh: #root::dfir_pipes::push::Push<Item, ()>,
1466                                                {
1467                                                    type Ctx<'ctx> = Psh::Ctx<'ctx>;
1468
1469                                                    type CanPend = Psh::CanPend;
1470
1471                                                    #[inline(always)]
1472                                                    fn poll_ready(
1473                                                        self: ::std::pin::Pin<&mut Self>,
1474                                                        ctx: &mut Self::Ctx<'_>,
1475                                                    ) -> #root::dfir_pipes::push::PushStep<Self::CanPend> {
1476                                                        #root::dfir_pipes::push::Push::poll_ready(self.project().inner, ctx)
1477                                                    }
1478
1479                                                    #[inline(always)]
1480                                                    fn start_send(
1481                                                        self: ::std::pin::Pin<&mut Self>,
1482                                                        item: Item,
1483                                                        meta: (),
1484                                                    ) {
1485                                                        #root::dfir_pipes::push::Push::start_send(self.project().inner, item, meta)
1486                                                    }
1487
1488                                                    #[inline(always)]
1489                                                    fn poll_finalize(
1490                                                        self: ::std::pin::Pin<&mut Self>,
1491                                                        ctx: &mut Self::Ctx<'_>,
1492                                                    ) -> #root::dfir_pipes::push::PushStep<Self::CanPend> {
1493                                                        #root::dfir_pipes::push::Push::poll_finalize(self.project().inner, ctx)
1494                                                    }
1495
1496                                                    #[inline(always)]
1497                                                    fn size_hint(
1498                                                        self: ::std::pin::Pin<&mut Self>,
1499                                                        hint: (usize, Option<usize>),
1500                                                    ) {
1501                                                        #root::dfir_pipes::push::Push::size_hint(self.project().inner, hint)
1502                                                    }
1503                                                }
1504
1505                                                PushGuard {
1506                                                    inner: psh
1507                                                }
1508                                            }
1509                                            #work_fn( #ident )
1510                                        };
1511                                    }
1512                                };
1513                                subgraph_op_iter_code.push(type_guard);
1514                            }
1515                            subgraph_op_iter_after_code.push(write_iterator_after);
1516                        }
1517                    }
1518
1519                    {
1520                        // Determine pull and push halves of the `Pivot`.
1521                        let pull_ident = if 0 < pull_to_push_idx {
1522                            self.node_as_ident(subgraph_nodes[pull_to_push_idx - 1], false)
1523                        } else {
1524                            // Entire subgraph is push (with a single recv/pull handoff input).
1525                            recv_port_idents[0].clone()
1526                        };
1527
1528                        #[rustfmt::skip]
1529                        let push_ident = if let Some(&node_id) =
1530                            subgraph_nodes.get(pull_to_push_idx)
1531                        {
1532                            self.node_as_ident(node_id, false)
1533                        } else if 1 == send_port_idents.len() {
1534                            // Entire subgraph is pull (with a single send/push handoff output).
1535                            send_port_idents[0].clone()
1536                        } else {
1537                            diagnostics.push(Diagnostic::spanned(
1538                                pull_ident.span(),
1539                                Level::Error,
1540                                "Degenerate subgraph detected, is there a disconnected `null()` or other degenerate pipeline somewhere?",
1541                            ));
1542                            continue;
1543                        };
1544
1545                        // Pivot span is combination of pull and push spans (or if not possible, just take the push).
1546                        let pivot_span = pull_ident
1547                            .span()
1548                            .join(push_ident.span())
1549                            .unwrap_or_else(|| push_ident.span());
1550                        let pivot_fn_ident = Ident::new(
1551                            &format!("pivot_run_sg_{:?}", subgraph_id.data()),
1552                            pivot_span,
1553                        );
1554                        let root = change_spans(root.clone(), pivot_span);
1555                        subgraph_op_iter_code.push(quote_spanned! {pivot_span=>
1556                            #[inline(always)]
1557                            fn #pivot_fn_ident<Pul, Psh, Item>(pull: Pul, push: Psh)
1558                                -> impl ::std::future::Future<Output = ()>
1559                            where
1560                                Pul: #root::dfir_pipes::pull::Pull<Item = Item>,
1561                                Psh: #root::dfir_pipes::push::Push<Item, Pul::Meta>,
1562                            {
1563                                #root::dfir_pipes::pull::Pull::send_push(pull, push)
1564                            }
1565                            (#pivot_fn_ident)(#pull_ident, #push_ident).await;
1566                        });
1567                    }
1568                };
1569
1570                // Each subgraph block is an async block so it can be individually instrumented.
1571                // Note: this ident is for the subgraph future, not a runtime SubgraphId binding
1572                // (unlike the scheduled path's `sg_ident`).
1573                let sg_fut_ident = subgraph_id.as_ident(Span::call_site());
1574
1575                // Generate send-side curr_items_count updates (after subgraph runs).
1576                let send_metrics_code = send_hoffs
1577                    .iter()
1578                    .zip(send_buf_idents.iter())
1579                    .zip(send_kinds.iter())
1580                    .map(|((&hoff_id, buf_ident), &kind)| {
1581                        let hoff_ffi = hoff_id.data().as_ffi();
1582                        let len_expr = match kind {
1583                            HandoffKind::Singleton | HandoffKind::Optional => {
1584                                quote! { if #buf_ident.is_some() { 1 } else { 0 } }
1585                            }
1586                            HandoffKind::Vec => {
1587                                quote! { #buf_ident.len() }
1588                            }
1589                        };
1590                        quote! {
1591                            __dfir_metrics.handoffs[
1592                                #root::slotmap::KeyData::from_ffi(#hoff_ffi).into()
1593                            ].curr_items_count.set(#len_expr);
1594                        }
1595                    })
1596                    .collect::<Vec<_>>();
1597
1598                // Create the handoffs we are about to push to (send).
1599                let send_hoff_make_code = send_buf_idents.iter()
1600                    .zip(send_kinds.iter())
1601                    .zip(send_hoffs.iter())
1602                    .map(|((buf_ident, &kind), &hoff_id)| {
1603                        let span = buf_ident.span();
1604                        if back_edge_hoffs_and_lazyness.contains_key(hoff_id) {
1605                            // Defer_tick send buffers are declared outside the tick closure
1606                            // as std::vec::Vec for O(1) swap. Just clear here.
1607                            quote_spanned! {span=>
1608                                #buf_ident.clear();
1609                            }
1610                        } else {
1611                            match kind {
1612                                HandoffKind::Vec => quote_spanned! {span=>
1613                                    let mut #buf_ident = #root::bumpalo::collections::Vec::new_in(&#bump_ident);
1614                                },
1615                                HandoffKind::Singleton | HandoffKind::Optional => quote_spanned! {span=>
1616                                    let mut #buf_ident = ::std::option::Option::None;
1617                                },
1618                            }
1619                        }
1620                    });
1621                // Drop the handoffs we just drained (recv).
1622                // TODO(mingwei): we could use `.into_iter()` instead of `.drain(..)` to consume the handoffs directly.
1623                // This only works for handoffs within the tick, though, not `defer_tick` handoffs.
1624                let recv_hoff_drop_code = recv_buf_idents
1625                    .iter()
1626                    .zip(recv_hoffs.iter())
1627                    .filter(|&(_, &hoff_id)| !back_edge_hoffs_and_lazyness.contains_key(hoff_id))
1628                    .map(|(buf_ident, _)| {
1629                        let span = buf_ident.span();
1630                        quote_spanned! {span=>
1631                            let _ = #buf_ident;
1632                        }
1633                    });
1634
1635                subgraph_blocks.push(quote! {
1636                    // Create the handoffs we are about to push to (send).
1637                    #( #send_hoff_make_code )*
1638
1639                    let #sg_fut_ident = async {
1640                        let #context = &#df;
1641                        #( #recv_port_code )*
1642                        #( #send_port_code )*
1643                        #( #subgraph_op_iter_code )*
1644                        #( #subgraph_op_iter_after_code )*
1645                    };
1646                    {
1647                        // Instrument w/ the subgraph metrics.
1648                        let sg_metrics = &__dfir_metrics.subgraphs[
1649                            #root::slotmap::KeyData::from_ffi(#sg_metrics_ffi).into()
1650                        ];
1651                        #root::scheduled::metrics::InstrumentSubgraph::new(
1652                            #sg_fut_ident, sg_metrics
1653                        ).await;
1654                        sg_metrics.total_run_count.update(|x| x + 1);
1655
1656                        // Update send (output) handoff metrics.
1657                        #( #send_metrics_code )*
1658
1659                        // Drop the handoffs we just drained (recv).
1660                        #( #recv_hoff_drop_code )*
1661                    }
1662                });
1663
1664                // Collect per-subgraph prologues into the main prologue lists.
1665                // (They are already pushed above in the operator loop.)
1666            }
1667        }
1668
1669        if diagnostics.has_error() {
1670            return Err(std::mem::take(diagnostics));
1671        }
1672        let _ = diagnostics; // Ensure no more diagnostics may be added after checking for errors.
1673
1674        let (meta_graph_arg, diagnostics_arg) = if include_meta {
1675            let meta_graph_json = serde_json::to_string(&self).unwrap();
1676            let meta_graph_json = Literal::string(&meta_graph_json);
1677
1678            let serde_diagnostics: Vec<_> = diagnostics.iter().map(Diagnostic::to_serde).collect();
1679            let diagnostics_json = serde_json::to_string(&*serde_diagnostics).unwrap();
1680            let diagnostics_json = Literal::string(&diagnostics_json);
1681
1682            (
1683                quote! { Some(#meta_graph_json) },
1684                quote! { Some(#diagnostics_json) },
1685            )
1686        } else {
1687            (quote! { None }, quote! { None })
1688        };
1689
1690        // Generate metrics initialization: one entry per handoff and per subgraph.
1691        let metrics_init_code = {
1692            let handoff_inits = handoff_nodes.iter().map(|&(node_id, _, _)| {
1693                let ffi = node_id.data().as_ffi();
1694                quote! {
1695                    dfir_metrics.handoffs.insert(
1696                        #root::slotmap::KeyData::from_ffi(#ffi).into(),
1697                        ::std::default::Default::default(),
1698                    );
1699                }
1700            });
1701            let subgraph_inits = all_subgraphs.iter().map(|&(sg_id, _)| {
1702                let ffi = sg_id.data().as_ffi();
1703                quote! {
1704                    dfir_metrics.subgraphs.insert(
1705                        #root::slotmap::KeyData::from_ffi(#ffi).into(),
1706                        ::std::default::Default::default(),
1707                    );
1708                }
1709            });
1710            handoff_inits.chain(subgraph_inits).collect::<Vec<_>>()
1711        };
1712
1713        // For creating back-buffer handoff vecs.
1714        let back_buffer_idents = back_buffer_idents_laziness
1715            .iter()
1716            .map(|(back_ident, _, _)| back_ident);
1717        // For creating the send-side buffer for defer_tick handoffs (also outside the closure).
1718        let defer_tick_buf_idents = back_buffer_idents_laziness
1719            .iter()
1720            .map(|(_, buf_ident, _)| buf_ident);
1721        // For checking if we should start the next tick (`schedule_subgraph`):
1722        // check the regular (send) buffer for non-lazy defer_tick handoffs, since
1723        // that's where the producer writes during this tick.
1724        let non_lazy_buf_idents = back_buffer_idents_laziness
1725            .iter()
1726            .filter_map(|(_, buf_ident, is_lazy)| (!is_lazy).then_some(buf_ident));
1727
1728        // Prologues and buffer declarations persist across ticks (outside the closure).
1729        // Subgraph blocks run each tick (inside the closure).
1730        Ok(quote! {
1731            {
1732                #prefix
1733
1734                use #root::{var_expr, var_args};
1735
1736                let __dfir_wake_state = ::std::sync::Arc::new(
1737                    #root::scheduled::context::WakeState::default()
1738                );
1739
1740                let __dfir_metrics = {
1741                    let mut dfir_metrics = #root::scheduled::metrics::DfirMetrics::default();
1742                    #( #metrics_init_code )*
1743                    ::std::rc::Rc::new(dfir_metrics)
1744                };
1745
1746                #[allow(unused_mut)]
1747                let mut #df = #root::scheduled::context::Context::new(
1748                    ::std::clone::Clone::clone(&__dfir_wake_state),
1749                    __dfir_metrics,
1750                );
1751
1752                #( #op_prologue_code )*
1753
1754                // For tick-boundary handoffs (`defer_tick` / `defer_tick_lazy`), declare both the
1755                // send buffer and the "back" buffer as std::vec::Vec outside the tick closure.
1756                // This enables O(1) mem::swap at end of tick for double-buffering.
1757                #( let mut #back_buffer_idents = ::std::vec::Vec::new(); )*
1758                #( let mut #defer_tick_buf_idents = ::std::vec::Vec::new(); )*
1759
1760                // Bump allocator for handoffs (except for back-edge handoffs, above).
1761                let mut #bump_ident = #root::bumpalo::Bump::new();
1762
1763                // Pre-set to true so the first tick always returns true
1764                // (matching Dfir pre-scheduling behavior). Subsequent ticks
1765                // start false (from take()) and are set true by recv port code
1766                // if any handoff buffer has data.
1767                let mut __dfir_work_done = true;
1768                #[allow(unused_qualifications, unused_mut, unused_variables, clippy::await_holding_refcell_ref, clippy::deref_addrof)]
1769                let __dfir_inline_tick = async move |#df: &mut #root::scheduled::context::Context| {
1770                    // Reset arena between ticks (start-of-tick)
1771                    #bump_ident.reset();
1772
1773                    {
1774                        let __dfir_metrics = #df.metrics();
1775
1776                        #( #subgraph_blocks )*
1777
1778                        // For non-lazy defer_tick: if any deferred buffer has data,
1779                        // signal that another tick should run.
1780                        if false #( || !#non_lazy_buf_idents.is_empty() )* {
1781                            #df.schedule_subgraph(true);
1782                        }
1783
1784                        // Double-buffer swap for defer_tick handoffs: move last tick's producer output (regular buffer)
1785                        // into the back buffer for the consumer to drain.
1786                        #( #back_edge_swap_code )*
1787                    }
1788
1789                    // End-of-tick per-operator state handling (i.e. 'tick persistence).
1790                    #( #op_tick_end_code )*
1791
1792                    #df.__end_tick();
1793
1794                    ::std::mem::take(&mut __dfir_work_done)
1795                };
1796                #root::scheduled::context::Dfir::new(
1797                    __dfir_inline_tick,
1798                    #df,
1799                    #meta_graph_arg,
1800                    #diagnostics_arg,
1801                )
1802            }
1803        })
1804    }
1805
1806    /// Color mode (pull vs. push, handoff vs. comp) for nodes. Some nodes can be push *OR* pull;
1807    /// those nodes will not be set in the returned map.
1808    pub fn node_color_map(&self) -> SparseSecondaryMap<GraphNodeId, Color> {
1809        let mut node_color_map: SparseSecondaryMap<GraphNodeId, Color> = self
1810            .node_ids()
1811            .filter_map(|node_id| {
1812                let op_color = self.node_color(node_id)?;
1813                Some((node_id, op_color))
1814            })
1815            .collect();
1816
1817        // Fill in rest via subgraphs.
1818        for sg_nodes in self.subgraph_nodes.values() {
1819            let pull_to_push_idx = self.find_pull_to_push_idx(sg_nodes);
1820
1821            for (idx, node_id) in sg_nodes.iter().copied().enumerate() {
1822                let is_pull = idx < pull_to_push_idx;
1823                node_color_map.insert(node_id, if is_pull { Color::Pull } else { Color::Push });
1824            }
1825        }
1826
1827        node_color_map
1828    }
1829
1830    /// Writes this graph as mermaid into a string.
1831    pub fn to_mermaid(&self, write_config: &WriteConfig) -> String {
1832        let mut output = String::new();
1833        self.write_mermaid(&mut output, write_config).unwrap();
1834        output
1835    }
1836
1837    /// Writes this graph as mermaid into the given `Write`.
1838    pub fn write_mermaid(
1839        &self,
1840        output: impl std::fmt::Write,
1841        write_config: &WriteConfig,
1842    ) -> std::fmt::Result {
1843        let mut graph_write = Mermaid::new(output);
1844        self.write_graph(&mut graph_write, write_config)
1845    }
1846
1847    /// Writes this graph as DOT (graphviz) into a string.
1848    pub fn to_dot(&self, write_config: &WriteConfig) -> String {
1849        let mut output = String::new();
1850        let mut graph_write = Dot::new(&mut output);
1851        self.write_graph(&mut graph_write, write_config).unwrap();
1852        output
1853    }
1854
1855    /// Writes this graph as DOT (graphviz) into the given `Write`.
1856    pub fn write_dot(
1857        &self,
1858        output: impl std::fmt::Write,
1859        write_config: &WriteConfig,
1860    ) -> std::fmt::Result {
1861        let mut graph_write = Dot::new(output);
1862        self.write_graph(&mut graph_write, write_config)
1863    }
1864
1865    /// Write out this graph using the given `GraphWrite`. E.g. `Mermaid` or `Dot.
1866    pub(crate) fn write_graph<W>(
1867        &self,
1868        mut graph_write: W,
1869        write_config: &WriteConfig,
1870    ) -> Result<(), W::Err>
1871    where
1872        W: GraphWrite,
1873    {
1874        fn helper_edge_label(
1875            src_port: &PortIndexValue,
1876            dst_port: &PortIndexValue,
1877        ) -> Option<String> {
1878            let src_label = match src_port {
1879                PortIndexValue::Path(path) => Some(path.to_token_stream().to_string()),
1880                PortIndexValue::Int(index) => Some(index.value.to_string()),
1881                _ => None,
1882            };
1883            let dst_label = match dst_port {
1884                PortIndexValue::Path(path) => Some(path.to_token_stream().to_string()),
1885                PortIndexValue::Int(index) => Some(index.value.to_string()),
1886                _ => None,
1887            };
1888            let label = match (src_label, dst_label) {
1889                (Some(l1), Some(l2)) => Some(format!("{}\n{}", l1, l2)),
1890                (Some(l1), None) => Some(l1),
1891                (None, Some(l2)) => Some(l2),
1892                (None, None) => None,
1893            };
1894            label
1895        }
1896
1897        // Make node color map one time.
1898        let node_color_map = self.node_color_map();
1899
1900        // Write prologue.
1901        graph_write.write_prologue()?;
1902
1903        // Define nodes.
1904        let mut skipped_handoffs = BTreeSet::new();
1905        for (node_id, node) in self.nodes() {
1906            if matches!(node, GraphNode::Handoff { .. }) && write_config.no_handoffs {
1907                skipped_handoffs.insert(node_id);
1908                continue;
1909            }
1910            graph_write.write_node_definition(
1911                node_id,
1912                &if write_config.op_short_text {
1913                    node.to_name_string()
1914                } else if write_config.op_text_no_imports {
1915                    // Remove any lines that start with "use" (imports)
1916                    let full_text = node.to_pretty_string();
1917                    let mut output = String::new();
1918                    for sentence in full_text.split('\n') {
1919                        if sentence.trim().starts_with("use") {
1920                            continue;
1921                        }
1922                        output.push('\n');
1923                        output.push_str(sentence);
1924                    }
1925                    output.into()
1926                } else {
1927                    node.to_pretty_string()
1928                },
1929                if write_config.no_pull_push {
1930                    None
1931                } else {
1932                    node_color_map.get(node_id).copied()
1933                },
1934            )?;
1935        }
1936
1937        // Write edges.
1938        for (edge_id, (src_id, mut dst_id)) in self.edges() {
1939            // Handling for if `write_config.no_handoffs` true.
1940            if skipped_handoffs.contains(&src_id) {
1941                continue;
1942            }
1943
1944            let (src_port, mut dst_port) = self.edge_ports(edge_id);
1945            if skipped_handoffs.contains(&dst_id) {
1946                // The destination is a hidden handoff. If it has a successor, skip through.
1947                // If it has 0 successors (ref-only singleton), drop this edge entirely —
1948                // the data dependency is captured via the reference edge instead.
1949                let mut handoff_succs = self.node_successors(dst_id);
1950                if handoff_succs.len() == 0 {
1951                    continue;
1952                }
1953                let (succ_edge, succ_node) = handoff_succs.next().unwrap();
1954                dst_id = succ_node;
1955                dst_port = self.edge_ports(succ_edge).1;
1956            }
1957
1958            let label = helper_edge_label(src_port, dst_port);
1959            let delay_type = self
1960                .node_op_inst(dst_id)
1961                .and_then(|op_inst| (op_inst.op_constraints.input_delaytype_fn)(dst_port));
1962            graph_write.write_edge(src_id, dst_id, delay_type, label.as_deref(), false)?;
1963        }
1964
1965        // Write reference edges.
1966        if !write_config.no_references {
1967            for dst_id in self.node_ids() {
1968                for src_ref_id in self
1969                    .node_handoff_references(dst_id)
1970                    .iter()
1971                    .filter_map(|r| r.node_id)
1972                {
1973                    // When handoffs are hidden, resolve through to the predecessor of
1974                    // the singleton handoff so the edge points from the actual writer.
1975                    let resolved_src = if skipped_handoffs.contains(&src_ref_id) {
1976                        self.node_predecessor_nodes(src_ref_id).next()
1977                    } else {
1978                        Some(src_ref_id)
1979                    };
1980                    let Some(resolved_src) = resolved_src else {
1981                        continue;
1982                    };
1983                    let delay_type = Some(DelayType::Stratum);
1984                    let label = None;
1985                    graph_write.write_edge(resolved_src, dst_id, delay_type, label, true)?;
1986                }
1987            }
1988        }
1989
1990        // The following code is a little bit tricky. Generally, the graph has the hierarchy:
1991        // `loop -> subgraph -> varname -> node`. However, each of these can be disabled via the `write_config`. To
1992        // handle both the enabled and disabled case, this code is structured as a series of nested loops. If the layer
1993        // is disabled, then the HashMap<Option<KEY>, Vec<VALUE>> will only have a single key (`None`) with a
1994        // corresponding `Vec` value containing everything. This way no special handling is needed for the next layer.
1995
1996        // Loop -> Subgraphs
1997        let loop_subgraphs = self.subgraph_ids().map(|sg_id| {
1998            let loop_id = if write_config.no_loops {
1999                None
2000            } else {
2001                self.subgraph_loop(sg_id)
2002            };
2003            (loop_id, sg_id)
2004        });
2005        let loop_subgraphs = into_group_map(loop_subgraphs);
2006        for (loop_id, subgraph_ids) in loop_subgraphs {
2007            if let Some(loop_id) = loop_id {
2008                graph_write.write_loop_start(loop_id)?;
2009            }
2010
2011            // Subgraph -> Varnames.
2012            let subgraph_varnames_nodes = subgraph_ids.into_iter().flat_map(|sg_id| {
2013                self.subgraph(sg_id).iter().copied().map(move |node_id| {
2014                    let opt_sg_id = if write_config.no_subgraphs {
2015                        None
2016                    } else {
2017                        Some(sg_id)
2018                    };
2019                    (opt_sg_id, (self.node_varname(node_id), node_id))
2020                })
2021            });
2022            let subgraph_varnames_nodes = into_group_map(subgraph_varnames_nodes);
2023            for (sg_id, varnames) in subgraph_varnames_nodes {
2024                if let Some(sg_id) = sg_id {
2025                    graph_write.write_subgraph_start(sg_id)?;
2026                }
2027
2028                // Varnames -> Nodes.
2029                let varname_nodes = varnames.into_iter().map(|(varname, node)| {
2030                    let varname = if write_config.no_varnames {
2031                        None
2032                    } else {
2033                        varname
2034                    };
2035                    (varname, node)
2036                });
2037                let varname_nodes = into_group_map(varname_nodes);
2038                for (varname, node_ids) in varname_nodes {
2039                    if let Some(varname) = varname {
2040                        graph_write.write_varname_start(&varname.0.to_string(), sg_id)?;
2041                    }
2042
2043                    // Write all nodes.
2044                    for node_id in node_ids {
2045                        graph_write.write_node(node_id)?;
2046                    }
2047
2048                    if varname.is_some() {
2049                        graph_write.write_varname_end()?;
2050                    }
2051                }
2052
2053                if sg_id.is_some() {
2054                    graph_write.write_subgraph_end()?;
2055                }
2056            }
2057
2058            if loop_id.is_some() {
2059                graph_write.write_loop_end()?;
2060            }
2061        }
2062
2063        // Write epilogue.
2064        graph_write.write_epilogue()?;
2065
2066        Ok(())
2067    }
2068
2069    /// Convert back into surface syntax.
2070    pub fn surface_syntax_string(&self) -> String {
2071        let mut string = String::new();
2072        self.write_surface_syntax(&mut string).unwrap();
2073        string
2074    }
2075
2076    /// Convert back into surface syntax.
2077    pub fn write_surface_syntax(&self, write: &mut impl std::fmt::Write) -> std::fmt::Result {
2078        for (key, node) in self.nodes.iter() {
2079            match node {
2080                GraphNode::Operator(op) => {
2081                    writeln!(write, "_{:?} = {};", key.data(), op.to_token_stream())?;
2082                }
2083                GraphNode::Handoff {
2084                    kind: HandoffKind::Vec,
2085                    ..
2086                } => {
2087                    writeln!(write, "_{:?} = handoff();", key.data())?;
2088                }
2089                GraphNode::Handoff {
2090                    kind: HandoffKind::Singleton,
2091                    ..
2092                } => {
2093                    writeln!(write, "_{:?} = singleton();", key.data())?;
2094                }
2095                GraphNode::Handoff {
2096                    kind: HandoffKind::Optional,
2097                    ..
2098                } => {
2099                    writeln!(write, "_{:?} = optional();", key.data())?;
2100                }
2101                GraphNode::ModuleBoundary { .. } => panic!(),
2102            }
2103        }
2104        writeln!(write)?;
2105        for (e, (src_key, dst_key)) in self.graph.edges() {
2106            let (src_port, dst_port) = self.edge_ports(e);
2107            let src_port_str = if src_port.is_specified() {
2108                format!("[{}]", src_port)
2109            } else {
2110                String::new()
2111            };
2112            let dst_port_str = if dst_port.is_specified() {
2113                format!("[{}]", dst_port)
2114            } else {
2115                String::new()
2116            };
2117            writeln!(
2118                write,
2119                "_{:?}{} -> {}_{:?};",
2120                src_key.data(),
2121                src_port_str,
2122                dst_port_str,
2123                dst_key.data()
2124            )?;
2125        }
2126        Ok(())
2127    }
2128
2129    /// Convert into a [mermaid](https://mermaid-js.github.io/) graph. Ignores subgraphs.
2130    pub fn mermaid_string_flat(&self) -> String {
2131        let mut string = String::new();
2132        self.write_mermaid_flat(&mut string).unwrap();
2133        string
2134    }
2135
2136    /// Convert into a [mermaid](https://mermaid-js.github.io/) graph. Ignores subgraphs.
2137    pub fn write_mermaid_flat(&self, write: &mut impl std::fmt::Write) -> std::fmt::Result {
2138        writeln!(write, "flowchart TB")?;
2139        for (key, node) in self.nodes.iter() {
2140            match node {
2141                GraphNode::Operator(operator) => writeln!(
2142                    write,
2143                    "    %% {span}\n    {id:?}[\"{row_col} <tt>{code}</tt>\"]",
2144                    span = PrettySpan(node.span()),
2145                    id = key.data(),
2146                    row_col = PrettyRowCol(node.span()),
2147                    code = operator
2148                        .to_token_stream()
2149                        .to_string()
2150                        .replace('&', "&amp;")
2151                        .replace('<', "&lt;")
2152                        .replace('>', "&gt;")
2153                        .replace('"', "&quot;")
2154                        .replace('\n', "<br>"),
2155                ),
2156                GraphNode::Handoff {
2157                    kind: HandoffKind::Vec,
2158                    ..
2159                } => {
2160                    writeln!(write, r#"    {:?}{{"{}"}}"#, key.data(), HANDOFF_NODE_STR)
2161                }
2162                GraphNode::Handoff {
2163                    kind: HandoffKind::Singleton | HandoffKind::Optional,
2164                    ..
2165                } => {
2166                    writeln!(
2167                        write,
2168                        r#"    {:?}{{"{}"}}"#,
2169                        key.data(),
2170                        SINGLETON_SLOT_NODE_STR
2171                    )
2172                }
2173                GraphNode::ModuleBoundary { .. } => {
2174                    writeln!(
2175                        write,
2176                        r#"    {:?}{{"{}"}}"#,
2177                        key.data(),
2178                        MODULE_BOUNDARY_NODE_STR
2179                    )
2180                }
2181            }?;
2182        }
2183        writeln!(write)?;
2184        for (_e, (src_key, dst_key)) in self.graph.edges() {
2185            writeln!(write, "    {:?}-->{:?}", src_key.data(), dst_key.data())?;
2186        }
2187        Ok(())
2188    }
2189}
2190
2191/// Loops
2192impl DfirGraph {
2193    /// Iterator over all loop IDs.
2194    pub fn loop_ids(&self) -> slotmap::basic::Keys<'_, GraphLoopId, Vec<GraphNodeId>> {
2195        self.loop_nodes.keys()
2196    }
2197
2198    /// Iterator over all loops, ID and members: `(GraphLoopId, Vec<GraphNodeId>)`.
2199    pub fn loops(&self) -> slotmap::basic::Iter<'_, GraphLoopId, Vec<GraphNodeId>> {
2200        self.loop_nodes.iter()
2201    }
2202
2203    /// Create a new loop context, with the given parent loop (or `None`).
2204    pub fn insert_loop(&mut self, parent_loop: Option<GraphLoopId>) -> GraphLoopId {
2205        let loop_id = self.loop_nodes.insert(Vec::new());
2206        self.loop_children.insert(loop_id, Vec::new());
2207        if let Some(parent_loop) = parent_loop {
2208            self.loop_parent.insert(loop_id, parent_loop);
2209            self.loop_children
2210                .get_mut(parent_loop)
2211                .unwrap()
2212                .push(loop_id);
2213        } else {
2214            self.root_loops.push(loop_id);
2215        }
2216        loop_id
2217    }
2218
2219    /// Get a node's loop context (or `None` for root).
2220    pub fn node_loop(&self, node_id: GraphNodeId) -> Option<GraphLoopId> {
2221        self.node_loops.get(node_id).copied()
2222    }
2223
2224    /// Get a subgraph's loop context (or `None` for root).
2225    pub fn subgraph_loop(&self, subgraph_id: GraphSubgraphId) -> Option<GraphLoopId> {
2226        let &node_id = self.subgraph(subgraph_id).first().unwrap();
2227        let out = self.node_loop(node_id);
2228        debug_assert!(
2229            self.subgraph(subgraph_id)
2230                .iter()
2231                .all(|&node_id| self.node_loop(node_id) == out),
2232            "Subgraph nodes should all have the same loop context."
2233        );
2234        out
2235    }
2236
2237    /// Get a loop context's parent loop context (or `None` for root).
2238    pub fn loop_parent(&self, loop_id: GraphLoopId) -> Option<GraphLoopId> {
2239        self.loop_parent.get(loop_id).copied()
2240    }
2241
2242    /// Get a loop context's child loops.
2243    pub fn loop_children(&self, loop_id: GraphLoopId) -> &Vec<GraphLoopId> {
2244        self.loop_children.get(loop_id).unwrap()
2245    }
2246}
2247
2248/// Configuration for writing graphs.
2249#[derive(Clone, Debug, Default)]
2250#[cfg_attr(feature = "clap-derive", derive(clap::Args))]
2251pub struct WriteConfig {
2252    /// Subgraphs will not be rendered if set.
2253    #[cfg_attr(feature = "clap-derive", arg(long))]
2254    pub no_subgraphs: bool,
2255    /// Variable names will not be rendered if set.
2256    #[cfg_attr(feature = "clap-derive", arg(long))]
2257    pub no_varnames: bool,
2258    /// Will not render pull/push shapes if set.
2259    #[cfg_attr(feature = "clap-derive", arg(long))]
2260    pub no_pull_push: bool,
2261    /// Will not render handoffs if set.
2262    #[cfg_attr(feature = "clap-derive", arg(long))]
2263    pub no_handoffs: bool,
2264    /// Will not render singleton references if set.
2265    #[cfg_attr(feature = "clap-derive", arg(long))]
2266    pub no_references: bool,
2267    /// Will not render loops if set.
2268    #[cfg_attr(feature = "clap-derive", arg(long))]
2269    pub no_loops: bool,
2270
2271    /// Op text will only be their name instead of the whole source.
2272    #[cfg_attr(feature = "clap-derive", arg(long))]
2273    pub op_short_text: bool,
2274    /// Op text will exclude any line that starts with "use".
2275    #[cfg_attr(feature = "clap-derive", arg(long))]
2276    pub op_text_no_imports: bool,
2277}
2278
2279/// Enum for choosing between mermaid and dot graph writing.
2280#[derive(Copy, Clone, Debug)]
2281#[cfg_attr(feature = "clap-derive", derive(clap::Parser, clap::ValueEnum))]
2282pub enum WriteGraphType {
2283    /// Mermaid graphs.
2284    Mermaid,
2285    /// Dot (Graphviz) graphs.
2286    Dot,
2287}
2288
2289/// [`itertools::Itertools::into_group_map`], but for `BTreeMap`.
2290fn into_group_map<K, V>(iter: impl IntoIterator<Item = (K, V)>) -> BTreeMap<K, Vec<V>>
2291where
2292    K: Ord,
2293{
2294    let mut out: BTreeMap<_, Vec<_>> = BTreeMap::new();
2295    for (k, v) in iter {
2296        out.entry(k).or_default().push(v);
2297    }
2298    out
2299}