Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39/// A closure expression bundled with any singleton references it captures.
40///
41/// When a `q!()` closure captures a `SingletonRef`, the reference is recorded here
42/// alongside the closure's expression. This allows per-closure tracking of singleton
43/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
44pub struct ClosureExpr {
45    pub(crate) expr: DebugExpr,
46    /// Each entry is `(HydroNode::Reference, is_mut: bool)`.
47    /// The index in the Vec determines the ident name via [`handoff_ref_ident`].
48    /// The `access_counter` was assigned at staging time in code order.
49    pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53    fn clone(&self) -> Self {
54        Self {
55            expr: self.expr.clone(),
56            singleton_refs: self
57                .singleton_refs
58                .iter()
59                .map(|(node, is_mut)| {
60                    let HydroNode::Reference {
61                        inner,
62                        kind,
63                        access_counter,
64                        metadata,
65                    } = node
66                    else {
67                        panic!("singleton_refs should only contain HydroNode::Reference");
68                    };
69                    (
70                        HydroNode::Reference {
71                            inner: SharedNode(Rc::clone(&inner.0)),
72                            kind: *kind,
73                            access_counter: access_counter.freeze(),
74                            metadata: metadata.clone(),
75                        },
76                        *is_mut,
77                    )
78                })
79                .collect(),
80        }
81    }
82}
83
84impl Hash for ClosureExpr {
85    fn hash<H: Hasher>(&self, state: &mut H) {
86        self.expr.hash(state);
87        // singleton_refs are structural children (like HydroIrMetadata), not
88        // identity-defining. Two closures with the same expr but different
89        // captured refs are the same closure text — the refs only affect codegen.
90    }
91}
92
93impl serde::Serialize for ClosureExpr {
94    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95        use serde::ser::SerializeStruct;
96        let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97        s.serialize_field("expr", &self.expr)?;
98        s.serialize_field(
99            "singleton_refs",
100            &SerializableSingletonRefs(&self.singleton_refs),
101        )?;
102        s.end()
103    }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110        use serde::ser::SerializeSeq;
111        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112        for (node, is_mut) in self.0.iter() {
113            seq.serialize_element(&(node, is_mut))?;
114        }
115        seq.end()
116    }
117}
118
119impl Debug for ClosureExpr {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        Debug::fmt(&self.expr, f)
122    }
123}
124
125impl Display for ClosureExpr {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        Display::fmt(&self.expr, f)
128    }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132    fn from(expr: syn::Expr) -> Self {
133        Self {
134            expr: DebugExpr(Box::new(expr)),
135            singleton_refs: Vec::new(),
136        }
137    }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141    fn from(expr: DebugExpr) -> Self {
142        Self {
143            expr,
144            singleton_refs: Vec::new(),
145        }
146    }
147}
148
149impl ClosureExpr {
150    pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151        Self {
152            expr,
153            singleton_refs,
154        }
155    }
156
157    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
158        Self {
159            expr: self.expr.clone(),
160            singleton_refs: self
161                .singleton_refs
162                .iter()
163                .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
164                .collect(),
165        }
166    }
167
168    pub fn transform_children(
169        &mut self,
170        transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
171        seen_tees: &mut SeenSharedNodes,
172    ) {
173        for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
174            transform(ref_node, seen_tees);
175        }
176    }
177
178    /// Pop singleton ref idents from the stack and rewrite the closure's token stream,
179    /// replacing local singleton ref idents with `#{N} dfir_ident` or `#{N} mut dfir_ident` references.
180    #[cfg(feature = "build")]
181    pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
182        if self.singleton_refs.is_empty() {
183            self.expr.0.to_token_stream()
184        } else {
185            assert!(
186                ident_stack.len() >= self.singleton_refs.len(),
187                "ident_stack has {} entries but expected at least {} for singleton_refs",
188                ident_stack.len(),
189                self.singleton_refs.len()
190            );
191            let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
192
193            let mut let_bindings = Vec::new();
194            for ((i, (ref_node, is_mut)), ref_ident) in
195                self.singleton_refs.iter().enumerate().zip(ref_idents)
196            {
197                let HydroNode::Reference { access_counter, .. } = ref_node else {
198                    panic!("ClosureExpression expected references to `HydroNode::Reference`");
199                };
200                let group = access_counter.frozen_group();
201                // TODO(mingwei): proper spanning?
202                let local_ident = handoff_ref_ident(i);
203                let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
204                let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
205                let mut_token = is_mut.then(|| quote!(mut));
206                let binding = quote! {
207                    let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
208                };
209                let_bindings.push(binding);
210            }
211
212            let expr = &self.expr.0;
213            quote! {
214                {
215                    #( #let_bindings )*
216                    #expr
217                }
218            }
219        }
220    }
221}
222
223/// Wrapper that displays only the tokens of a parsed expr.
224///
225/// Boxes `syn::Type` which is ~240 bytes.
226#[derive(Clone, Hash)]
227pub struct DebugExpr(pub Box<syn::Expr>);
228
229impl serde::Serialize for DebugExpr {
230    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
231        serializer.serialize_str(&self.to_string())
232    }
233}
234
235impl From<syn::Expr> for DebugExpr {
236    fn from(expr: syn::Expr) -> Self {
237        Self(Box::new(expr))
238    }
239}
240
241impl Deref for DebugExpr {
242    type Target = syn::Expr;
243
244    fn deref(&self) -> &Self::Target {
245        &self.0
246    }
247}
248
249impl ToTokens for DebugExpr {
250    fn to_tokens(&self, tokens: &mut TokenStream) {
251        self.0.to_tokens(tokens);
252    }
253}
254
255impl Debug for DebugExpr {
256    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257        write!(f, "{}", self.0.to_token_stream())
258    }
259}
260
261impl Display for DebugExpr {
262    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        let original = self.0.as_ref().clone();
264        let simplified = simplify_q_macro(original);
265
266        // For now, just use quote formatting without trying to parse as a statement
267        // This avoids the syn::parse_quote! issues entirely
268        write!(f, "q!({})", quote::quote!(#simplified))
269    }
270}
271
272/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
273fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
274    if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
275        // Look for calls to stageleft::runtime_support::fn*
276        && is_stageleft_runtime_support_call(&path_expr.path)
277        && let syn::Expr::Block(b) = &call.args[0]
278        && b.block.stmts.len() == 3
279        && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
280    // skip the first two, which are imports
281    {
282        let mut e = e.clone();
283        while let syn::Expr::Block(ref mut block) = e
284            && block.block.stmts.len() == 1
285            && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
286        {
287            e = inner_e;
288        }
289
290        e
291    } else {
292        expr
293    }
294}
295
296fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
297    // Check if this is a call to stageleft::runtime_support::fn*
298    if let Some(last_segment) = path.segments.last() {
299        let fn_name = last_segment.ident.to_string();
300        path.segments.len() > 2
301            && path.segments[0].ident == "stageleft"
302            && path.segments[1].ident == "runtime_support"
303            && fn_name.contains("_type_hint")
304    } else {
305        false
306    }
307}
308
309/// Debug displays the type's tokens.
310///
311/// Boxes `syn::Type` which is ~320 bytes.
312#[derive(Clone, PartialEq, Eq, Hash)]
313pub struct DebugType(pub Box<syn::Type>);
314
315impl From<syn::Type> for DebugType {
316    fn from(t: syn::Type) -> Self {
317        Self(Box::new(t))
318    }
319}
320
321impl Deref for DebugType {
322    type Target = syn::Type;
323
324    fn deref(&self) -> &Self::Target {
325        &self.0
326    }
327}
328
329impl ToTokens for DebugType {
330    fn to_tokens(&self, tokens: &mut TokenStream) {
331        self.0.to_tokens(tokens);
332    }
333}
334
335impl Debug for DebugType {
336    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337        write!(f, "{}", self.0.to_token_stream())
338    }
339}
340
341impl serde::Serialize for DebugType {
342    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
343        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
344    }
345}
346
347fn serialize_backtrace_as_span<S: serde::Serializer>(
348    backtrace: &Backtrace,
349    serializer: S,
350) -> Result<S::Ok, S::Error> {
351    match backtrace.format_span() {
352        Some(span) => serializer.serialize_some(&span),
353        None => serializer.serialize_none(),
354    }
355}
356
357fn serialize_ident<S: serde::Serializer>(
358    ident: &syn::Ident,
359    serializer: S,
360) -> Result<S::Ok, S::Error> {
361    serializer.serialize_str(&ident.to_string())
362}
363
364pub enum DebugInstantiate {
365    Building,
366    Finalized(Box<DebugInstantiateFinalized>),
367}
368
369impl serde::Serialize for DebugInstantiate {
370    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
371        match self {
372            DebugInstantiate::Building => {
373                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
374            }
375            DebugInstantiate::Finalized(_) => {
376                panic!(
377                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
378                )
379            }
380        }
381    }
382}
383
384#[cfg_attr(
385    not(feature = "build"),
386    expect(
387        dead_code,
388        reason = "sink, source unused without `feature = \"build\"`."
389    )
390)]
391pub struct DebugInstantiateFinalized {
392    sink: syn::Expr,
393    source: syn::Expr,
394    connect_fn: Option<Box<dyn FnOnce()>>,
395}
396
397impl From<DebugInstantiateFinalized> for DebugInstantiate {
398    fn from(f: DebugInstantiateFinalized) -> Self {
399        Self::Finalized(Box::new(f))
400    }
401}
402
403impl Debug for DebugInstantiate {
404    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
405        write!(f, "<network instantiate>")
406    }
407}
408
409impl Hash for DebugInstantiate {
410    fn hash<H: Hasher>(&self, _state: &mut H) {
411        // Do nothing
412    }
413}
414
415impl Clone for DebugInstantiate {
416    fn clone(&self) -> Self {
417        match self {
418            DebugInstantiate::Building => DebugInstantiate::Building,
419            DebugInstantiate::Finalized(_) => {
420                panic!("DebugInstantiate::Finalized should not be cloned")
421            }
422        }
423    }
424}
425
426/// Tracks the instantiation state of a `ClusterMembers` source.
427///
428/// During `compile_network`, the first `ClusterMembers` node for a given
429/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
430/// receives the expression returned by `Deploy::cluster_membership_stream`.
431/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
432/// during code-gen they simply reference the tee output of the first node
433/// instead of creating a redundant `source_stream`.
434#[derive(Debug, Hash, Clone, serde::Serialize)]
435pub enum ClusterMembersState {
436    /// Not yet instantiated.
437    Uninit,
438    /// The primary instance: holds the stream expression and will emit
439    /// `source_stream(expr) -> tee()` during code-gen.
440    Stream(DebugExpr),
441    /// A secondary instance that references the tee output of the primary.
442    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
443    /// can derive the deterministic tee ident without extra state.
444    Tee(LocationId, LocationId),
445}
446
447/// A source in a Hydro graph, where data enters the graph.
448#[derive(Debug, Hash, Clone, serde::Serialize)]
449pub enum HydroSource {
450    Stream(DebugExpr),
451    ExternalNetwork(),
452    Iter(DebugExpr),
453    Spin(),
454    ClusterMembers(LocationId, ClusterMembersState),
455    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
456    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
457}
458
459#[cfg(feature = "build")]
460/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
461/// and simulations.
462///
463/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
464/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
465pub trait DfirBuilder {
466    /// Whether the representation of singletons should include intermediate states.
467    fn singleton_intermediates(&self) -> bool;
468
469    /// Gets the DFIR builder for the given location, creating it if necessary.
470    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
471
472    #[expect(clippy::too_many_arguments, reason = "TODO")]
473    fn batch(
474        &mut self,
475        in_ident: syn::Ident,
476        in_location: &LocationId,
477        in_kind: &CollectionKind,
478        out_ident: &syn::Ident,
479        out_location: &LocationId,
480        op_meta: &HydroIrOpMetadata,
481        fold_hooked_idents: &HashSet<String>,
482    );
483    fn yield_from_tick(
484        &mut self,
485        in_ident: syn::Ident,
486        in_location: &LocationId,
487        in_kind: &CollectionKind,
488        out_ident: &syn::Ident,
489        out_location: &LocationId,
490    );
491
492    fn begin_atomic(
493        &mut self,
494        in_ident: syn::Ident,
495        in_location: &LocationId,
496        in_kind: &CollectionKind,
497        out_ident: &syn::Ident,
498        out_location: &LocationId,
499        op_meta: &HydroIrOpMetadata,
500    );
501    fn end_atomic(
502        &mut self,
503        in_ident: syn::Ident,
504        in_location: &LocationId,
505        in_kind: &CollectionKind,
506        out_ident: &syn::Ident,
507    );
508
509    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
510    fn observe_nondet(
511        &mut self,
512        trusted: bool,
513        location: &LocationId,
514        in_ident: syn::Ident,
515        in_kind: &CollectionKind,
516        out_ident: &syn::Ident,
517        out_kind: &CollectionKind,
518        op_meta: &HydroIrOpMetadata,
519    );
520
521    #[expect(clippy::too_many_arguments, reason = "TODO")]
522    fn merge_ordered(
523        &mut self,
524        location: &LocationId,
525        first_ident: syn::Ident,
526        second_ident: syn::Ident,
527        out_ident: &syn::Ident,
528        in_kind: &CollectionKind,
529        op_meta: &HydroIrOpMetadata,
530        operator_tag: Option<&str>,
531    );
532
533    #[expect(clippy::too_many_arguments, reason = "TODO")]
534    fn create_network(
535        &mut self,
536        from: &LocationId,
537        to: &LocationId,
538        input_ident: syn::Ident,
539        out_ident: &syn::Ident,
540        serialize: Option<&DebugExpr>,
541        sink: syn::Expr,
542        source: syn::Expr,
543        deserialize: Option<&DebugExpr>,
544        tag_id: StmtId,
545        networking_info: &crate::networking::NetworkingInfo,
546    );
547
548    fn create_external_source(
549        &mut self,
550        on: &LocationId,
551        source_expr: syn::Expr,
552        out_ident: &syn::Ident,
553        deserialize: Option<&DebugExpr>,
554        tag_id: StmtId,
555    );
556
557    fn create_external_output(
558        &mut self,
559        on: &LocationId,
560        sink_expr: syn::Expr,
561        input_ident: &syn::Ident,
562        serialize: Option<&DebugExpr>,
563        tag_id: StmtId,
564    );
565
566    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
567    /// Returns the new input ident to use for the fold if a hook was emitted.
568    fn emit_fold_hook(
569        &mut self,
570        location: &LocationId,
571        in_ident: &syn::Ident,
572        in_kind: &CollectionKind,
573        op_meta: &HydroIrOpMetadata,
574    ) -> Option<syn::Ident>;
575
576    /// Inserts necessary code to validate a manual assertion that at this point the
577    /// input live collection is consistent. In production, this is a no-op, but in simulation
578    /// this will (not yet implemented) inject assertions that validate consistency.
579    fn assert_is_consistent(
580        &mut self,
581        trusted: bool,
582        location: &LocationId,
583        in_ident: syn::Ident,
584        out_ident: &syn::Ident,
585    );
586}
587
588#[cfg(feature = "build")]
589impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
590    fn singleton_intermediates(&self) -> bool {
591        false
592    }
593
594    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
595        self.entry(location.root().key())
596            .expect("location was removed")
597            .or_default()
598    }
599
600    fn batch(
601        &mut self,
602        in_ident: syn::Ident,
603        in_location: &LocationId,
604        in_kind: &CollectionKind,
605        out_ident: &syn::Ident,
606        _out_location: &LocationId,
607        _op_meta: &HydroIrOpMetadata,
608        _fold_hooked_idents: &HashSet<String>,
609    ) {
610        let builder = self.get_dfir_mut(in_location.root());
611        if in_kind.is_bounded()
612            && matches!(
613                in_kind,
614                CollectionKind::Singleton { .. }
615                    | CollectionKind::Optional { .. }
616                    | CollectionKind::KeyedSingleton { .. }
617            )
618        {
619            assert!(in_location.is_top_level());
620            builder.add_dfir(
621                parse_quote! {
622                    #out_ident = #in_ident -> persist::<'static>();
623                },
624                None,
625                None,
626            );
627        } else {
628            builder.add_dfir(
629                parse_quote! {
630                    #out_ident = #in_ident;
631                },
632                None,
633                None,
634            );
635        }
636    }
637
638    fn yield_from_tick(
639        &mut self,
640        in_ident: syn::Ident,
641        in_location: &LocationId,
642        _in_kind: &CollectionKind,
643        out_ident: &syn::Ident,
644        _out_location: &LocationId,
645    ) {
646        let builder = self.get_dfir_mut(in_location.root());
647        builder.add_dfir(
648            parse_quote! {
649                #out_ident = #in_ident;
650            },
651            None,
652            None,
653        );
654    }
655
656    fn begin_atomic(
657        &mut self,
658        in_ident: syn::Ident,
659        in_location: &LocationId,
660        _in_kind: &CollectionKind,
661        out_ident: &syn::Ident,
662        _out_location: &LocationId,
663        _op_meta: &HydroIrOpMetadata,
664    ) {
665        let builder = self.get_dfir_mut(in_location.root());
666        builder.add_dfir(
667            parse_quote! {
668                #out_ident = #in_ident;
669            },
670            None,
671            None,
672        );
673    }
674
675    fn end_atomic(
676        &mut self,
677        in_ident: syn::Ident,
678        in_location: &LocationId,
679        _in_kind: &CollectionKind,
680        out_ident: &syn::Ident,
681    ) {
682        let builder = self.get_dfir_mut(in_location.root());
683        builder.add_dfir(
684            parse_quote! {
685                #out_ident = #in_ident;
686            },
687            None,
688            None,
689        );
690    }
691
692    fn observe_nondet(
693        &mut self,
694        _trusted: bool,
695        location: &LocationId,
696        in_ident: syn::Ident,
697        _in_kind: &CollectionKind,
698        out_ident: &syn::Ident,
699        _out_kind: &CollectionKind,
700        _op_meta: &HydroIrOpMetadata,
701    ) {
702        let builder = self.get_dfir_mut(location);
703        builder.add_dfir(
704            parse_quote! {
705                #out_ident = #in_ident;
706            },
707            None,
708            None,
709        );
710    }
711
712    fn merge_ordered(
713        &mut self,
714        location: &LocationId,
715        first_ident: syn::Ident,
716        second_ident: syn::Ident,
717        out_ident: &syn::Ident,
718        _in_kind: &CollectionKind,
719        _op_meta: &HydroIrOpMetadata,
720        operator_tag: Option<&str>,
721    ) {
722        let builder = self.get_dfir_mut(location);
723        builder.add_dfir(
724            parse_quote! {
725                #out_ident = union();
726                #first_ident -> [0]#out_ident;
727                #second_ident -> [1]#out_ident;
728            },
729            None,
730            operator_tag,
731        );
732    }
733
734    fn create_network(
735        &mut self,
736        from: &LocationId,
737        to: &LocationId,
738        input_ident: syn::Ident,
739        out_ident: &syn::Ident,
740        serialize: Option<&DebugExpr>,
741        sink: syn::Expr,
742        source: syn::Expr,
743        deserialize: Option<&DebugExpr>,
744        tag_id: StmtId,
745        _networking_info: &crate::networking::NetworkingInfo,
746    ) {
747        let sender_builder = self.get_dfir_mut(from);
748        if let Some(serialize_pipeline) = serialize {
749            sender_builder.add_dfir(
750                parse_quote! {
751                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
752                },
753                None,
754                // operator tag separates send and receive, which otherwise have the same next_stmt_id
755                Some(&format!("send{}", tag_id)),
756            );
757        } else {
758            sender_builder.add_dfir(
759                parse_quote! {
760                    #input_ident -> dest_sink(#sink);
761                },
762                None,
763                Some(&format!("send{}", tag_id)),
764            );
765        }
766
767        let receiver_builder = self.get_dfir_mut(to);
768        if let Some(deserialize_pipeline) = deserialize {
769            receiver_builder.add_dfir(
770                parse_quote! {
771                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
772                },
773                None,
774                Some(&format!("recv{}", tag_id)),
775            );
776        } else {
777            receiver_builder.add_dfir(
778                parse_quote! {
779                    #out_ident = source_stream(#source);
780                },
781                None,
782                Some(&format!("recv{}", tag_id)),
783            );
784        }
785    }
786
787    fn create_external_source(
788        &mut self,
789        on: &LocationId,
790        source_expr: syn::Expr,
791        out_ident: &syn::Ident,
792        deserialize: Option<&DebugExpr>,
793        tag_id: StmtId,
794    ) {
795        let receiver_builder = self.get_dfir_mut(on);
796        if let Some(deserialize_pipeline) = deserialize {
797            receiver_builder.add_dfir(
798                parse_quote! {
799                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
800                },
801                None,
802                Some(&format!("recv{}", tag_id)),
803            );
804        } else {
805            receiver_builder.add_dfir(
806                parse_quote! {
807                    #out_ident = source_stream(#source_expr);
808                },
809                None,
810                Some(&format!("recv{}", tag_id)),
811            );
812        }
813    }
814
815    fn create_external_output(
816        &mut self,
817        on: &LocationId,
818        sink_expr: syn::Expr,
819        input_ident: &syn::Ident,
820        serialize: Option<&DebugExpr>,
821        tag_id: StmtId,
822    ) {
823        let sender_builder = self.get_dfir_mut(on);
824        if let Some(serialize_fn) = serialize {
825            sender_builder.add_dfir(
826                parse_quote! {
827                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
828                },
829                None,
830                // operator tag separates send and receive, which otherwise have the same next_stmt_id
831                Some(&format!("send{}", tag_id)),
832            );
833        } else {
834            sender_builder.add_dfir(
835                parse_quote! {
836                    #input_ident -> dest_sink(#sink_expr);
837                },
838                None,
839                Some(&format!("send{}", tag_id)),
840            );
841        }
842    }
843
844    fn emit_fold_hook(
845        &mut self,
846        _location: &LocationId,
847        _in_ident: &syn::Ident,
848        _in_kind: &CollectionKind,
849        _op_meta: &HydroIrOpMetadata,
850    ) -> Option<syn::Ident> {
851        None
852    }
853
854    fn assert_is_consistent(
855        &mut self,
856        _trusted: bool,
857        location: &LocationId,
858        in_ident: syn::Ident,
859        out_ident: &syn::Ident,
860    ) {
861        let builder = self.get_dfir_mut(location);
862        builder.add_dfir(
863            parse_quote! {
864                #out_ident = #in_ident;
865            },
866            None,
867            None,
868        );
869    }
870}
871
872#[cfg(feature = "build")]
873pub enum BuildersOrCallback<'a, L, N>
874where
875    L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
876    N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
877{
878    Builders(&'a mut dyn DfirBuilder),
879    Callback(L, N),
880}
881
882/// An root in a Hydro graph, which is an pipeline that doesn't emit
883/// any downstream values. Traversals over the dataflow graph and
884/// generating DFIR IR start from roots.
885#[derive(Debug, Hash, serde::Serialize)]
886pub enum HydroRoot {
887    ForEach {
888        f: ClosureExpr,
889        input: Box<HydroNode>,
890        op_metadata: HydroIrOpMetadata,
891    },
892    SendExternal {
893        to_external_key: LocationKey,
894        to_port_id: ExternalPortId,
895        to_many: bool,
896        unpaired: bool,
897        serialize_fn: Option<DebugExpr>,
898        instantiate_fn: DebugInstantiate,
899        input: Box<HydroNode>,
900        op_metadata: HydroIrOpMetadata,
901    },
902    DestSink {
903        sink: DebugExpr,
904        input: Box<HydroNode>,
905        op_metadata: HydroIrOpMetadata,
906    },
907    CycleSink {
908        cycle_id: CycleId,
909        input: Box<HydroNode>,
910        op_metadata: HydroIrOpMetadata,
911    },
912    EmbeddedOutput {
913        #[serde(serialize_with = "serialize_ident")]
914        ident: syn::Ident,
915        input: Box<HydroNode>,
916        op_metadata: HydroIrOpMetadata,
917    },
918    Null {
919        input: Box<HydroNode>,
920        op_metadata: HydroIrOpMetadata,
921    },
922}
923
924impl HydroRoot {
925    #[cfg(feature = "build")]
926    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
927    pub fn compile_network<'a, D>(
928        &mut self,
929        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
930        seen_tees: &mut SeenSharedNodes,
931        seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
932        processes: &SparseSecondaryMap<LocationKey, D::Process>,
933        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
934        externals: &SparseSecondaryMap<LocationKey, D::External>,
935        env: &mut D::InstantiateEnv,
936    ) where
937        D: Deploy<'a>,
938    {
939        let refcell_extra_stmts = RefCell::new(extra_stmts);
940        let refcell_env = RefCell::new(env);
941        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
942        self.transform_bottom_up(
943            &mut |l| {
944                if let HydroRoot::SendExternal {
945                    input,
946                    to_external_key,
947                    to_port_id,
948                    to_many,
949                    unpaired,
950                    instantiate_fn,
951                    ..
952                } = l
953                {
954                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
955                        DebugInstantiate::Building => {
956                            let to_node = externals
957                                .get(*to_external_key)
958                                .unwrap_or_else(|| {
959                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
960                                })
961                                .clone();
962
963                            match input.metadata().location_id.root() {
964                                &LocationId::Process(process_key) => {
965                                    if *to_many {
966                                        (
967                                            (
968                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
969                                                parse_quote!(DUMMY),
970                                            ),
971                                            Box::new(|| {}) as Box<dyn FnOnce()>,
972                                        )
973                                    } else {
974                                        let from_node = processes
975                                            .get(process_key)
976                                            .unwrap_or_else(|| {
977                                                panic!("A process used in the graph was not instantiated: {}", process_key)
978                                            })
979                                            .clone();
980
981                                        let sink_port = from_node.next_port();
982                                        let source_port = to_node.next_port();
983
984                                        if *unpaired {
985                                            use stageleft::quote_type;
986                                            use tokio_util::codec::LengthDelimitedCodec;
987
988                                            to_node.register(*to_port_id, source_port.clone());
989
990                                            let _ = D::e2o_source(
991                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
992                                                &to_node, &source_port,
993                                                &from_node, &sink_port,
994                                                &quote_type::<LengthDelimitedCodec>(),
995                                                format!("{}_{}", *to_external_key, *to_port_id)
996                                            );
997                                        }
998
999                                        (
1000                                            (
1001                                                D::o2e_sink(
1002                                                    &from_node,
1003                                                    &sink_port,
1004                                                    &to_node,
1005                                                    &source_port,
1006                                                    format!("{}_{}", *to_external_key, *to_port_id)
1007                                                ),
1008                                                parse_quote!(DUMMY),
1009                                            ),
1010                                            if *unpaired {
1011                                                D::e2o_connect(
1012                                                    &to_node,
1013                                                    &source_port,
1014                                                    &from_node,
1015                                                    &sink_port,
1016                                                    *to_many,
1017                                                    NetworkHint::Auto,
1018                                                )
1019                                            } else {
1020                                                Box::new(|| {}) as Box<dyn FnOnce()>
1021                                            },
1022                                        )
1023                                    }
1024                                }
1025                                LocationId::Cluster(cluster_key) => {
1026                                    let from_node = clusters
1027                                        .get(*cluster_key)
1028                                        .unwrap_or_else(|| {
1029                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1030                                        })
1031                                        .clone();
1032
1033                                    let sink_port = from_node.next_port();
1034                                    let source_port = to_node.next_port();
1035
1036                                    if *unpaired {
1037                                        to_node.register(*to_port_id, source_port.clone());
1038                                    }
1039
1040                                    (
1041                                        (
1042                                            D::m2e_sink(
1043                                                &from_node,
1044                                                &sink_port,
1045                                                &to_node,
1046                                                &source_port,
1047                                                format!("{}_{}", *to_external_key, *to_port_id)
1048                                            ),
1049                                            parse_quote!(DUMMY),
1050                                        ),
1051                                        Box::new(|| {}) as Box<dyn FnOnce()>,
1052                                    )
1053                                }
1054                                _ => panic!()
1055                            }
1056                        },
1057
1058                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1059                    };
1060
1061                    *instantiate_fn = DebugInstantiateFinalized {
1062                        sink: sink_expr,
1063                        source: source_expr,
1064                        connect_fn: Some(connect_fn),
1065                    }
1066                    .into();
1067                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1068                    let element_type = match &input.metadata().collection_kind {
1069                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1070                        _ => panic!("Embedded output must have Stream collection kind"),
1071                    };
1072                    let location_key = match input.metadata().location_id.root() {
1073                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1074                        _ => panic!("Embedded output must be on a process or cluster"),
1075                    };
1076                    D::register_embedded_output(
1077                        &mut refcell_env.borrow_mut(),
1078                        location_key,
1079                        ident,
1080                        &element_type,
1081                    );
1082                }
1083            },
1084            &mut |n| {
1085                if let HydroNode::Network {
1086                    name,
1087                    networking_info,
1088                    input,
1089                    instantiate_fn,
1090                    metadata,
1091                    ..
1092                } = n
1093                {
1094                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1095                        DebugInstantiate::Building => instantiate_network::<D>(
1096                            &mut refcell_env.borrow_mut(),
1097                            input.metadata().location_id.root(),
1098                            metadata.location_id.root(),
1099                            processes,
1100                            clusters,
1101                            name.as_deref(),
1102                            networking_info,
1103                        ),
1104
1105                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1106                    };
1107
1108                    *instantiate_fn = DebugInstantiateFinalized {
1109                        sink: sink_expr,
1110                        source: source_expr,
1111                        connect_fn: Some(connect_fn),
1112                    }
1113                    .into();
1114                } else if let HydroNode::ExternalInput {
1115                    from_external_key,
1116                    from_port_id,
1117                    from_many,
1118                    codec_type,
1119                    port_hint,
1120                    instantiate_fn,
1121                    metadata,
1122                    ..
1123                } = n
1124                {
1125                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1126                        DebugInstantiate::Building => {
1127                            let from_node = externals
1128                                .get(*from_external_key)
1129                                .unwrap_or_else(|| {
1130                                    panic!(
1131                                        "A external used in the graph was not instantiated: {}",
1132                                        from_external_key,
1133                                    )
1134                                })
1135                                .clone();
1136
1137                            match metadata.location_id.root() {
1138                                &LocationId::Process(process_key) => {
1139                                    let to_node = processes
1140                                        .get(process_key)
1141                                        .unwrap_or_else(|| {
1142                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1143                                        })
1144                                        .clone();
1145
1146                                    let sink_port = from_node.next_port();
1147                                    let source_port = to_node.next_port();
1148
1149                                    from_node.register(*from_port_id, sink_port.clone());
1150
1151                                    (
1152                                        (
1153                                            parse_quote!(DUMMY),
1154                                            if *from_many {
1155                                                D::e2o_many_source(
1156                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1157                                                    &to_node, &source_port,
1158                                                    codec_type.0.as_ref(),
1159                                                    format!("{}_{}", *from_external_key, *from_port_id)
1160                                                )
1161                                            } else {
1162                                                D::e2o_source(
1163                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1164                                                    &from_node, &sink_port,
1165                                                    &to_node, &source_port,
1166                                                    codec_type.0.as_ref(),
1167                                                    format!("{}_{}", *from_external_key, *from_port_id)
1168                                                )
1169                                            },
1170                                        ),
1171                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1172                                    )
1173                                }
1174                                LocationId::Cluster(cluster_key) => {
1175                                    let to_node = clusters
1176                                        .get(*cluster_key)
1177                                        .unwrap_or_else(|| {
1178                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1179                                        })
1180                                        .clone();
1181
1182                                    let sink_port = from_node.next_port();
1183                                    let source_port = to_node.next_port();
1184
1185                                    from_node.register(*from_port_id, sink_port.clone());
1186
1187                                    (
1188                                        (
1189                                            parse_quote!(DUMMY),
1190                                            D::e2m_source(
1191                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1192                                                &from_node, &sink_port,
1193                                                &to_node, &source_port,
1194                                                codec_type.0.as_ref(),
1195                                                format!("{}_{}", *from_external_key, *from_port_id)
1196                                            ),
1197                                        ),
1198                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1199                                    )
1200                                }
1201                                _ => panic!()
1202                            }
1203                        },
1204
1205                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1206                    };
1207
1208                    *instantiate_fn = DebugInstantiateFinalized {
1209                        sink: sink_expr,
1210                        source: source_expr,
1211                        connect_fn: Some(connect_fn),
1212                    }
1213                    .into();
1214                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1215                    let element_type = match &metadata.collection_kind {
1216                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1217                        _ => panic!("Embedded source must have Stream collection kind"),
1218                    };
1219                    let location_key = match metadata.location_id.root() {
1220                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1221                        _ => panic!("Embedded source must be on a process or cluster"),
1222                    };
1223                    D::register_embedded_stream_input(
1224                        &mut refcell_env.borrow_mut(),
1225                        location_key,
1226                        ident,
1227                        &element_type,
1228                    );
1229                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1230                    let element_type = match &metadata.collection_kind {
1231                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1232                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1233                    };
1234                    let location_key = match metadata.location_id.root() {
1235                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1236                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1237                    };
1238                    D::register_embedded_singleton_input(
1239                        &mut refcell_env.borrow_mut(),
1240                        location_key,
1241                        ident,
1242                        &element_type,
1243                    );
1244                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1245                    match state {
1246                        ClusterMembersState::Uninit => {
1247                            let at_location = metadata.location_id.root().clone();
1248                            let key = (at_location.clone(), location_id.key());
1249                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1250                                // First occurrence: call cluster_membership_stream and mark as Stream.
1251                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1252                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1253                                    &(),
1254                                );
1255                                *state = ClusterMembersState::Stream(expr.into());
1256                            } else {
1257                                // Already instantiated for this (at, target) pair: just tee.
1258                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1259                            }
1260                        }
1261                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1262                            panic!("cluster members already finalized");
1263                        }
1264                    }
1265                }
1266            },
1267            seen_tees,
1268            false,
1269        );
1270    }
1271
1272    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1273        self.transform_bottom_up(
1274            &mut |l| {
1275                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1276                    match instantiate_fn {
1277                        DebugInstantiate::Building => panic!("network not built"),
1278
1279                        DebugInstantiate::Finalized(finalized) => {
1280                            (finalized.connect_fn.take().unwrap())();
1281                        }
1282                    }
1283                }
1284            },
1285            &mut |n| {
1286                if let HydroNode::Network { instantiate_fn, .. }
1287                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1288                {
1289                    match instantiate_fn {
1290                        DebugInstantiate::Building => panic!("network not built"),
1291
1292                        DebugInstantiate::Finalized(finalized) => {
1293                            (finalized.connect_fn.take().unwrap())();
1294                        }
1295                    }
1296                }
1297            },
1298            seen_tees,
1299            false,
1300        );
1301    }
1302
1303    pub fn transform_bottom_up(
1304        &mut self,
1305        transform_root: &mut impl FnMut(&mut HydroRoot),
1306        transform_node: &mut impl FnMut(&mut HydroNode),
1307        seen_tees: &mut SeenSharedNodes,
1308        check_well_formed: bool,
1309    ) {
1310        self.transform_children(
1311            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1312            seen_tees,
1313        );
1314
1315        transform_root(self);
1316    }
1317
1318    pub fn transform_children(
1319        &mut self,
1320        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1321        seen_tees: &mut SeenSharedNodes,
1322    ) {
1323        match self {
1324            HydroRoot::ForEach { f, input, .. } => {
1325                f.transform_children(&mut transform, seen_tees);
1326                transform(input, seen_tees);
1327            }
1328            HydroRoot::SendExternal { input, .. }
1329            | HydroRoot::DestSink { input, .. }
1330            | HydroRoot::CycleSink { input, .. }
1331            | HydroRoot::EmbeddedOutput { input, .. }
1332            | HydroRoot::Null { input, .. } => {
1333                transform(input, seen_tees);
1334            }
1335        }
1336    }
1337
1338    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1339        match self {
1340            HydroRoot::ForEach {
1341                f,
1342                input,
1343                op_metadata,
1344            } => HydroRoot::ForEach {
1345                f: f.deep_clone(seen_tees),
1346                input: Box::new(input.deep_clone(seen_tees)),
1347                op_metadata: op_metadata.clone(),
1348            },
1349            HydroRoot::SendExternal {
1350                to_external_key,
1351                to_port_id,
1352                to_many,
1353                unpaired,
1354                serialize_fn,
1355                instantiate_fn,
1356                input,
1357                op_metadata,
1358            } => HydroRoot::SendExternal {
1359                to_external_key: *to_external_key,
1360                to_port_id: *to_port_id,
1361                to_many: *to_many,
1362                unpaired: *unpaired,
1363                serialize_fn: serialize_fn.clone(),
1364                instantiate_fn: instantiate_fn.clone(),
1365                input: Box::new(input.deep_clone(seen_tees)),
1366                op_metadata: op_metadata.clone(),
1367            },
1368            HydroRoot::DestSink {
1369                sink,
1370                input,
1371                op_metadata,
1372            } => HydroRoot::DestSink {
1373                sink: sink.clone(),
1374                input: Box::new(input.deep_clone(seen_tees)),
1375                op_metadata: op_metadata.clone(),
1376            },
1377            HydroRoot::CycleSink {
1378                cycle_id,
1379                input,
1380                op_metadata,
1381            } => HydroRoot::CycleSink {
1382                cycle_id: *cycle_id,
1383                input: Box::new(input.deep_clone(seen_tees)),
1384                op_metadata: op_metadata.clone(),
1385            },
1386            HydroRoot::EmbeddedOutput {
1387                ident,
1388                input,
1389                op_metadata,
1390            } => HydroRoot::EmbeddedOutput {
1391                ident: ident.clone(),
1392                input: Box::new(input.deep_clone(seen_tees)),
1393                op_metadata: op_metadata.clone(),
1394            },
1395            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1396                input: Box::new(input.deep_clone(seen_tees)),
1397                op_metadata: op_metadata.clone(),
1398            },
1399        }
1400    }
1401
1402    #[cfg(feature = "build")]
1403    pub fn emit(
1404        &mut self,
1405        graph_builders: &mut dyn DfirBuilder,
1406        seen_tees: &mut SeenSharedNodes,
1407        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1408        next_stmt_id: &mut crate::Counter<StmtId>,
1409        fold_hooked_idents: &mut HashSet<String>,
1410    ) {
1411        self.emit_core(
1412            &mut BuildersOrCallback::<
1413                fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1414                fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1415            >::Builders(graph_builders),
1416            seen_tees,
1417            built_tees,
1418            next_stmt_id,
1419            fold_hooked_idents,
1420        );
1421    }
1422
1423    #[cfg(feature = "build")]
1424    pub fn emit_core(
1425        &mut self,
1426        builders_or_callback: &mut BuildersOrCallback<
1427            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1428            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1429        >,
1430        seen_tees: &mut SeenSharedNodes,
1431        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1432        next_stmt_id: &mut crate::Counter<StmtId>,
1433        fold_hooked_idents: &mut HashSet<String>,
1434    ) {
1435        match self {
1436            HydroRoot::ForEach { f, input, .. } => {
1437                let input_ident = input.emit_core(
1438                    builders_or_callback,
1439                    seen_tees,
1440                    built_tees,
1441                    next_stmt_id,
1442                    fold_hooked_idents,
1443                );
1444
1445                let stmt_id = next_stmt_id.get_and_increment();
1446
1447                match builders_or_callback {
1448                    BuildersOrCallback::Builders(graph_builders) => {
1449                        let mut ident_stack: Vec<syn::Ident> = Vec::new();
1450
1451                        // Look up each captured ref's ident from built_tees
1452                        for (ref_node, _is_mut) in f.singleton_refs.iter() {
1453                            let HydroNode::Reference { inner, .. } = ref_node else {
1454                                panic!("singleton_refs should only contain HydroNode::Reference");
1455                            };
1456                            let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1457                            let idents = built_tees.get(&ptr).expect(
1458                                "ForEach singleton ref not found in built_tees — ref node was not emitted",
1459                            );
1460                            ident_stack.push(idents[0].clone());
1461                        }
1462
1463                        let f_tokens = f.emit_tokens(&mut ident_stack);
1464
1465                        graph_builders
1466                            .get_dfir_mut(&input.metadata().location_id)
1467                            .add_dfir(
1468                                parse_quote! {
1469                                    #input_ident -> for_each(#f_tokens);
1470                                },
1471                                None,
1472                                Some(&stmt_id.to_string()),
1473                            );
1474                    }
1475                    BuildersOrCallback::Callback(leaf_callback, _) => {
1476                        leaf_callback(self, next_stmt_id);
1477                    }
1478                }
1479            }
1480
1481            HydroRoot::SendExternal {
1482                serialize_fn,
1483                instantiate_fn,
1484                input,
1485                ..
1486            } => {
1487                let input_ident = input.emit_core(
1488                    builders_or_callback,
1489                    seen_tees,
1490                    built_tees,
1491                    next_stmt_id,
1492                    fold_hooked_idents,
1493                );
1494
1495                let stmt_id = next_stmt_id.get_and_increment();
1496
1497                match builders_or_callback {
1498                    BuildersOrCallback::Builders(graph_builders) => {
1499                        let (sink_expr, _) = match instantiate_fn {
1500                            DebugInstantiate::Building => (
1501                                syn::parse_quote!(DUMMY_SINK),
1502                                syn::parse_quote!(DUMMY_SOURCE),
1503                            ),
1504
1505                            DebugInstantiate::Finalized(finalized) => {
1506                                (finalized.sink.clone(), finalized.source.clone())
1507                            }
1508                        };
1509
1510                        graph_builders.create_external_output(
1511                            &input.metadata().location_id,
1512                            sink_expr,
1513                            &input_ident,
1514                            serialize_fn.as_ref(),
1515                            stmt_id,
1516                        );
1517                    }
1518                    BuildersOrCallback::Callback(leaf_callback, _) => {
1519                        leaf_callback(self, next_stmt_id);
1520                    }
1521                }
1522            }
1523
1524            HydroRoot::DestSink { sink, input, .. } => {
1525                let input_ident = input.emit_core(
1526                    builders_or_callback,
1527                    seen_tees,
1528                    built_tees,
1529                    next_stmt_id,
1530                    fold_hooked_idents,
1531                );
1532
1533                let stmt_id = next_stmt_id.get_and_increment();
1534
1535                match builders_or_callback {
1536                    BuildersOrCallback::Builders(graph_builders) => {
1537                        graph_builders
1538                            .get_dfir_mut(&input.metadata().location_id)
1539                            .add_dfir(
1540                                parse_quote! {
1541                                    #input_ident -> dest_sink(#sink);
1542                                },
1543                                None,
1544                                Some(&stmt_id.to_string()),
1545                            );
1546                    }
1547                    BuildersOrCallback::Callback(leaf_callback, _) => {
1548                        leaf_callback(self, next_stmt_id);
1549                    }
1550                }
1551            }
1552
1553            HydroRoot::CycleSink {
1554                cycle_id, input, ..
1555            } => {
1556                let input_ident = input.emit_core(
1557                    builders_or_callback,
1558                    seen_tees,
1559                    built_tees,
1560                    next_stmt_id,
1561                    fold_hooked_idents,
1562                );
1563
1564                match builders_or_callback {
1565                    BuildersOrCallback::Builders(graph_builders) => {
1566                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1567                            CollectionKind::KeyedSingleton {
1568                                key_type,
1569                                value_type,
1570                                ..
1571                            }
1572                            | CollectionKind::KeyedStream {
1573                                key_type,
1574                                value_type,
1575                                ..
1576                            } => {
1577                                parse_quote!((#key_type, #value_type))
1578                            }
1579                            CollectionKind::Stream { element_type, .. }
1580                            | CollectionKind::Singleton { element_type, .. }
1581                            | CollectionKind::Optional { element_type, .. } => {
1582                                parse_quote!(#element_type)
1583                            }
1584                        };
1585
1586                        let cycle_id_ident = cycle_id.as_ident();
1587                        graph_builders
1588                            .get_dfir_mut(&input.metadata().location_id)
1589                            .add_dfir(
1590                                parse_quote! {
1591                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1592                                },
1593                                None,
1594                                None,
1595                            );
1596                    }
1597                    // No ID, no callback
1598                    BuildersOrCallback::Callback(_, _) => {}
1599                }
1600            }
1601
1602            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1603                let input_ident = input.emit_core(
1604                    builders_or_callback,
1605                    seen_tees,
1606                    built_tees,
1607                    next_stmt_id,
1608                    fold_hooked_idents,
1609                );
1610
1611                let stmt_id = next_stmt_id.get_and_increment();
1612
1613                match builders_or_callback {
1614                    BuildersOrCallback::Builders(graph_builders) => {
1615                        graph_builders
1616                            .get_dfir_mut(&input.metadata().location_id)
1617                            .add_dfir(
1618                                parse_quote! {
1619                                    #input_ident -> for_each(&mut #ident);
1620                                },
1621                                None,
1622                                Some(&stmt_id.to_string()),
1623                            );
1624                    }
1625                    BuildersOrCallback::Callback(leaf_callback, _) => {
1626                        leaf_callback(self, next_stmt_id);
1627                    }
1628                }
1629            }
1630
1631            HydroRoot::Null { input, .. } => {
1632                let input_ident = input.emit_core(
1633                    builders_or_callback,
1634                    seen_tees,
1635                    built_tees,
1636                    next_stmt_id,
1637                    fold_hooked_idents,
1638                );
1639
1640                let stmt_id = next_stmt_id.get_and_increment();
1641
1642                match builders_or_callback {
1643                    BuildersOrCallback::Builders(graph_builders) => {
1644                        graph_builders
1645                            .get_dfir_mut(&input.metadata().location_id)
1646                            .add_dfir(
1647                                parse_quote! {
1648                                    #input_ident -> for_each(|_| {});
1649                                },
1650                                None,
1651                                Some(&stmt_id.to_string()),
1652                            );
1653                    }
1654                    BuildersOrCallback::Callback(leaf_callback, _) => {
1655                        leaf_callback(self, next_stmt_id);
1656                    }
1657                }
1658            }
1659        }
1660    }
1661
1662    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1663        match self {
1664            HydroRoot::ForEach { op_metadata, .. }
1665            | HydroRoot::SendExternal { op_metadata, .. }
1666            | HydroRoot::DestSink { op_metadata, .. }
1667            | HydroRoot::CycleSink { op_metadata, .. }
1668            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1669            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1670        }
1671    }
1672
1673    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1674        match self {
1675            HydroRoot::ForEach { op_metadata, .. }
1676            | HydroRoot::SendExternal { op_metadata, .. }
1677            | HydroRoot::DestSink { op_metadata, .. }
1678            | HydroRoot::CycleSink { op_metadata, .. }
1679            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1680            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1681        }
1682    }
1683
1684    pub fn input(&self) -> &HydroNode {
1685        match self {
1686            HydroRoot::ForEach { input, .. }
1687            | HydroRoot::SendExternal { input, .. }
1688            | HydroRoot::DestSink { input, .. }
1689            | HydroRoot::CycleSink { input, .. }
1690            | HydroRoot::EmbeddedOutput { input, .. }
1691            | HydroRoot::Null { input, .. } => input,
1692        }
1693    }
1694
1695    pub fn input_metadata(&self) -> &HydroIrMetadata {
1696        self.input().metadata()
1697    }
1698
1699    pub fn print_root(&self) -> String {
1700        match self {
1701            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1702            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1703            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1704            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1705            HydroRoot::EmbeddedOutput { ident, .. } => {
1706                format!("EmbeddedOutput({})", ident)
1707            }
1708            HydroRoot::Null { .. } => "Null".to_owned(),
1709        }
1710    }
1711
1712    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1713        match self {
1714            HydroRoot::ForEach { f, .. } => {
1715                transform(&mut f.expr);
1716            }
1717            HydroRoot::DestSink { sink, .. } => {
1718                transform(sink);
1719            }
1720            HydroRoot::SendExternal { .. }
1721            | HydroRoot::CycleSink { .. }
1722            | HydroRoot::EmbeddedOutput { .. }
1723            | HydroRoot::Null { .. } => {}
1724        }
1725    }
1726}
1727
1728#[cfg(feature = "build")]
1729fn tick_of(loc: &LocationId) -> Option<ClockId> {
1730    match loc {
1731        LocationId::Tick(id, _) => Some(*id),
1732        LocationId::Atomic(inner) => tick_of(inner),
1733        _ => None,
1734    }
1735}
1736
1737#[cfg(feature = "build")]
1738fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1739    match loc {
1740        LocationId::Tick(id, inner) => {
1741            *id = uf_find(uf, *id);
1742            remap_location(inner, uf);
1743        }
1744        LocationId::Atomic(inner) => {
1745            remap_location(inner, uf);
1746        }
1747        LocationId::Process(_) | LocationId::Cluster(_) => {}
1748    }
1749}
1750
1751#[cfg(feature = "build")]
1752fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1753    let p = *parent.get(&x).unwrap_or(&x);
1754    if p == x {
1755        return x;
1756    }
1757    let root = uf_find(parent, p);
1758    parent.insert(x, root);
1759    root
1760}
1761
1762#[cfg(feature = "build")]
1763fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1764    let ra = uf_find(parent, a);
1765    let rb = uf_find(parent, b);
1766    if ra != rb {
1767        parent.insert(ra, rb);
1768    }
1769}
1770
1771/// Traverse the IR to build a union-find that unifies tick IDs connected
1772/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1773/// rewrite all `LocationId`s to use the representative tick ID.
1774#[cfg(feature = "build")]
1775pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1776    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1777
1778    // Pass 1: collect unifications.
1779    transform_bottom_up(
1780        ir,
1781        &mut |_| {},
1782        &mut |node: &mut HydroNode| match node {
1783            HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1784                if let (Some(a), Some(b)) = (
1785                    tick_of(&inner.metadata().location_id),
1786                    tick_of(&metadata.location_id),
1787                ) {
1788                    uf_union(&mut uf, a, b);
1789                }
1790            }
1791            HydroNode::Chain {
1792                first,
1793                second,
1794                metadata,
1795            }
1796            | HydroNode::ChainFirst {
1797                first,
1798                second,
1799                metadata,
1800            }
1801            | HydroNode::MergeOrdered {
1802                first,
1803                second,
1804                metadata,
1805            } => {
1806                if let (Some(a), Some(b)) = (
1807                    tick_of(&first.metadata().location_id),
1808                    tick_of(&metadata.location_id),
1809                ) {
1810                    uf_union(&mut uf, a, b);
1811                }
1812                if let (Some(a), Some(b)) = (
1813                    tick_of(&second.metadata().location_id),
1814                    tick_of(&metadata.location_id),
1815                ) {
1816                    uf_union(&mut uf, a, b);
1817                }
1818            }
1819            _ => {}
1820        },
1821        false,
1822    );
1823
1824    // Pass 2: rewrite all LocationIds.
1825    transform_bottom_up(
1826        ir,
1827        &mut |_| {},
1828        &mut |node: &mut HydroNode| {
1829            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1830        },
1831        false,
1832    );
1833}
1834
1835#[cfg(feature = "build")]
1836pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1837    let mut builders = SecondaryMap::new();
1838    let mut seen_tees = HashMap::new();
1839    let mut built_tees = HashMap::new();
1840    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1841    let mut fold_hooked_idents = HashSet::new();
1842    for leaf in ir {
1843        leaf.emit(
1844            &mut builders,
1845            &mut seen_tees,
1846            &mut built_tees,
1847            &mut next_stmt_id,
1848            &mut fold_hooked_idents,
1849        );
1850    }
1851    builders
1852}
1853
1854#[cfg(feature = "build")]
1855pub fn traverse_dfir(
1856    ir: &mut [HydroRoot],
1857    transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1858    transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1859) {
1860    let mut seen_tees = HashMap::new();
1861    let mut built_tees = HashMap::new();
1862    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1863    let mut fold_hooked_idents = HashSet::new();
1864    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1865    ir.iter_mut().for_each(|leaf| {
1866        leaf.emit_core(
1867            &mut callback,
1868            &mut seen_tees,
1869            &mut built_tees,
1870            &mut next_stmt_id,
1871            &mut fold_hooked_idents,
1872        );
1873    });
1874}
1875
1876pub fn transform_bottom_up(
1877    ir: &mut [HydroRoot],
1878    transform_root: &mut impl FnMut(&mut HydroRoot),
1879    transform_node: &mut impl FnMut(&mut HydroNode),
1880    check_well_formed: bool,
1881) {
1882    let mut seen_tees = HashMap::new();
1883    ir.iter_mut().for_each(|leaf| {
1884        leaf.transform_bottom_up(
1885            transform_root,
1886            transform_node,
1887            &mut seen_tees,
1888            check_well_formed,
1889        );
1890    });
1891}
1892
1893pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1894    let mut seen_tees = HashMap::new();
1895    ir.iter()
1896        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1897        .collect()
1898}
1899
1900type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1901thread_local! {
1902    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1903    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1904    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1905    /// on subsequent encounters, preventing infinite loops.
1906    static SERIALIZED_SHARED: PrintedTees
1907        = const { RefCell::new(None) };
1908}
1909
1910pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1911    PRINTED_TEES.with(|printed_tees| {
1912        let mut printed_tees_mut = printed_tees.borrow_mut();
1913        *printed_tees_mut = Some((0, HashMap::new()));
1914        drop(printed_tees_mut);
1915
1916        let ret = f();
1917
1918        let mut printed_tees_mut = printed_tees.borrow_mut();
1919        *printed_tees_mut = None;
1920
1921        ret
1922    })
1923}
1924
1925/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1926/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1927/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1928/// back-reference.  The tracking state is restored when `f` returns or panics.
1929pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1930    let _guard = SerializedSharedGuard::enter();
1931    f()
1932}
1933
1934/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1935/// making `serialize_dedup_shared` re-entrant and panic-safe.
1936struct SerializedSharedGuard {
1937    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1938}
1939
1940impl SerializedSharedGuard {
1941    fn enter() -> Self {
1942        let previous = SERIALIZED_SHARED.with(|cell| {
1943            let mut guard = cell.borrow_mut();
1944            guard.replace((0, HashMap::new()))
1945        });
1946        Self { previous }
1947    }
1948}
1949
1950impl Drop for SerializedSharedGuard {
1951    fn drop(&mut self) {
1952        SERIALIZED_SHARED.with(|cell| {
1953            *cell.borrow_mut() = self.previous.take();
1954        });
1955    }
1956}
1957
1958pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1959
1960impl serde::Serialize for SharedNode {
1961    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
1962    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
1963    /// same subtree every time and, if the graph ever contains a cycle, loop
1964    /// forever.
1965    ///
1966    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
1967    /// integer id.  The first time we see a pointer we assign it the next id and
1968    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
1969    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
1970    /// recursion.  Requires an active `serialize_dedup_shared` scope.
1971    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1972        SERIALIZED_SHARED.with(|cell| {
1973            let mut guard = cell.borrow_mut();
1974            // (next_id, pointer → assigned_id)
1975            let state = guard.as_mut().ok_or_else(|| {
1976                serde::ser::Error::custom(
1977                    "SharedNode serialization requires an active serialize_dedup_shared scope",
1978                )
1979            })?;
1980            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1981
1982            if let Some(&id) = state.1.get(&ptr) {
1983                drop(guard);
1984                use serde::ser::SerializeMap;
1985                let mut map = serializer.serialize_map(Some(1))?;
1986                map.serialize_entry("$shared_ref", &id)?;
1987                map.end()
1988            } else {
1989                let id = state.0;
1990                state.0 += 1;
1991                state.1.insert(ptr, id);
1992                drop(guard);
1993
1994                use serde::ser::SerializeMap;
1995                let mut map = serializer.serialize_map(Some(2))?;
1996                map.serialize_entry("$shared", &id)?;
1997                map.serialize_entry("node", &*self.0.borrow())?;
1998                map.end()
1999            }
2000        })
2001    }
2002}
2003
2004impl SharedNode {
2005    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2006        Rc::as_ptr(&self.0)
2007    }
2008}
2009
2010impl Debug for SharedNode {
2011    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2012        PRINTED_TEES.with(|printed_tees| {
2013            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2014            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2015
2016            if let Some(printed_tees_mut) = printed_tees_mut {
2017                if let Some(existing) = printed_tees_mut
2018                    .1
2019                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2020                {
2021                    write!(f, "<shared {}>", existing)
2022                } else {
2023                    let next_id = printed_tees_mut.0;
2024                    printed_tees_mut.0 += 1;
2025                    printed_tees_mut
2026                        .1
2027                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2028                    drop(printed_tees_mut_borrow);
2029                    write!(f, "<shared {}>: ", next_id)?;
2030                    Debug::fmt(&self.0.borrow(), f)
2031                }
2032            } else {
2033                drop(printed_tees_mut_borrow);
2034                write!(f, "<shared>: ")?;
2035                Debug::fmt(&self.0.borrow(), f)
2036            }
2037        })
2038    }
2039}
2040
2041impl Hash for SharedNode {
2042    fn hash<H: Hasher>(&self, state: &mut H) {
2043        self.0.borrow_mut().hash(state);
2044    }
2045}
2046
2047/// A counter for tracking singleton access groups on a `HydroNode::Reference`.
2048///
2049/// Each mutable access increments the counter (before and after) to isolate itself in its own group;
2050/// immutable accesses share the current group.
2051#[derive(Debug)]
2052pub enum AccessCounter {
2053    Counting(Cell<u32>),
2054    Frozen(u32),
2055}
2056
2057impl AccessCounter {
2058    pub fn new() -> Self {
2059        Self::Counting(Cell::new(0))
2060    }
2061
2062    /// Assign the next access group for this reference.
2063    /// Mutable accesses get an isolated group (counter increments before and after).
2064    /// Immutable accesses share the current group.
2065    pub fn next_group(&self, is_mut: bool) -> Self {
2066        let AccessCounter::Counting(count) = self else {
2067            panic!("Cannot count on `AccessCounter::Frozen`");
2068        };
2069        let c = if is_mut {
2070            let c = count.get() + 1;
2071            count.set(c + 1);
2072            c
2073        } else {
2074            count.get()
2075        };
2076        Self::Frozen(c)
2077    }
2078
2079    /// Creates a frozen counter to prevent further counting.
2080    pub fn freeze(&self) -> Self {
2081        Self::Frozen(match self {
2082            Self::Counting(count) => count.get(),
2083            Self::Frozen(count) => *count,
2084        })
2085    }
2086
2087    pub fn frozen_group(&self) -> u32 {
2088        let Self::Frozen(count) = self else {
2089            panic!("`AccessCounter` not frozen");
2090        };
2091        *count
2092    }
2093}
2094
2095impl Default for AccessCounter {
2096    fn default() -> Self {
2097        Self::new()
2098    }
2099}
2100
2101impl Hash for AccessCounter {
2102    fn hash<H: Hasher>(&self, _state: &mut H) {
2103        // Access counter does not participate in hashing — it is runtime bookkeeping.
2104    }
2105}
2106
2107impl serde::Serialize for AccessCounter {
2108    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2109        let count = match self {
2110            AccessCounter::Counting(count) => count.get(),
2111            AccessCounter::Frozen(count) => *count,
2112        };
2113        count.serialize(serializer)
2114    }
2115}
2116
2117#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2118pub enum BoundKind {
2119    Unbounded,
2120    Bounded,
2121}
2122
2123#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2124pub enum StreamOrder {
2125    NoOrder,
2126    TotalOrder,
2127}
2128
2129#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2130pub enum StreamRetry {
2131    AtLeastOnce,
2132    ExactlyOnce,
2133}
2134
2135#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2136pub enum KeyedSingletonBoundKind {
2137    Unbounded,
2138    MonotonicKeys,
2139    MonotonicValue,
2140    BoundedValue,
2141    Bounded,
2142}
2143
2144#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2145pub enum SingletonBoundKind {
2146    Unbounded,
2147    Monotonic,
2148    Bounded,
2149}
2150
2151#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2152pub enum CollectionKind {
2153    Stream {
2154        bound: BoundKind,
2155        order: StreamOrder,
2156        retry: StreamRetry,
2157        element_type: DebugType,
2158    },
2159    Singleton {
2160        bound: SingletonBoundKind,
2161        element_type: DebugType,
2162    },
2163    Optional {
2164        bound: BoundKind,
2165        element_type: DebugType,
2166    },
2167    KeyedStream {
2168        bound: BoundKind,
2169        value_order: StreamOrder,
2170        value_retry: StreamRetry,
2171        key_type: DebugType,
2172        value_type: DebugType,
2173    },
2174    KeyedSingleton {
2175        bound: KeyedSingletonBoundKind,
2176        key_type: DebugType,
2177        value_type: DebugType,
2178    },
2179}
2180
2181impl CollectionKind {
2182    pub fn is_bounded(&self) -> bool {
2183        matches!(
2184            self,
2185            CollectionKind::Stream {
2186                bound: BoundKind::Bounded,
2187                ..
2188            } | CollectionKind::Singleton {
2189                bound: SingletonBoundKind::Bounded,
2190                ..
2191            } | CollectionKind::Optional {
2192                bound: BoundKind::Bounded,
2193                ..
2194            } | CollectionKind::KeyedStream {
2195                bound: BoundKind::Bounded,
2196                ..
2197            } | CollectionKind::KeyedSingleton {
2198                bound: KeyedSingletonBoundKind::Bounded,
2199                ..
2200            }
2201        )
2202    }
2203}
2204
2205#[derive(Clone, serde::Serialize)]
2206pub struct HydroIrMetadata {
2207    pub location_id: LocationId,
2208    pub collection_kind: CollectionKind,
2209    pub consistency: Option<ClusterConsistency>,
2210    pub cardinality: Option<usize>,
2211    pub tag: Option<String>,
2212    pub op: HydroIrOpMetadata,
2213}
2214
2215// HydroIrMetadata shouldn't be used to hash or compare
2216impl Hash for HydroIrMetadata {
2217    fn hash<H: Hasher>(&self, _: &mut H) {}
2218}
2219
2220impl PartialEq for HydroIrMetadata {
2221    fn eq(&self, _: &Self) -> bool {
2222        true
2223    }
2224}
2225
2226impl Eq for HydroIrMetadata {}
2227
2228impl Debug for HydroIrMetadata {
2229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2230        f.debug_struct("HydroIrMetadata")
2231            .field("location_id", &self.location_id)
2232            .field("collection_kind", &self.collection_kind)
2233            .finish()
2234    }
2235}
2236
2237/// Metadata that is specific to the operator itself, rather than its outputs.
2238/// This is available on _both_ inner nodes and roots.
2239#[derive(Clone, serde::Serialize)]
2240pub struct HydroIrOpMetadata {
2241    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2242    pub backtrace: Backtrace,
2243    pub cpu_usage: Option<f64>,
2244    pub network_recv_cpu_usage: Option<f64>,
2245    pub id: Option<usize>,
2246}
2247
2248impl HydroIrOpMetadata {
2249    #[expect(
2250        clippy::new_without_default,
2251        reason = "explicit calls to new ensure correct backtrace bounds"
2252    )]
2253    pub fn new() -> HydroIrOpMetadata {
2254        Self::new_with_skip(1)
2255    }
2256
2257    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2258        HydroIrOpMetadata {
2259            backtrace: Backtrace::get_backtrace(2 + skip_count),
2260            cpu_usage: None,
2261            network_recv_cpu_usage: None,
2262            id: None,
2263        }
2264    }
2265}
2266
2267impl Debug for HydroIrOpMetadata {
2268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2269        f.debug_struct("HydroIrOpMetadata").finish()
2270    }
2271}
2272
2273impl Hash for HydroIrOpMetadata {
2274    fn hash<H: Hasher>(&self, _: &mut H) {}
2275}
2276
2277/// An intermediate node in a Hydro graph, which consumes data
2278/// from upstream nodes and emits data to downstream nodes.
2279#[derive(Debug, Hash, serde::Serialize)]
2280pub enum HydroNode {
2281    Placeholder,
2282
2283    /// Manually "casts" between two different collection kinds.
2284    ///
2285    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2286    /// correctness checks. In particular, the user must ensure that every possible
2287    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2288    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2289    /// collection. This ensures that the simulator does not miss any possible outputs.
2290    Cast {
2291        inner: Box<HydroNode>,
2292        metadata: HydroIrMetadata,
2293    },
2294
2295    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2296    /// interpretation of the input stream.
2297    ///
2298    /// In production, this simply passes through the input, but in simulation, this operator
2299    /// explicitly selects a randomized interpretation.
2300    ObserveNonDet {
2301        inner: Box<HydroNode>,
2302        trusted: bool, // if true, we do not need to simulate non-determinism
2303        metadata: HydroIrMetadata,
2304    },
2305
2306    Source {
2307        source: HydroSource,
2308        metadata: HydroIrMetadata,
2309    },
2310
2311    SingletonSource {
2312        value: DebugExpr,
2313        first_tick_only: bool,
2314        metadata: HydroIrMetadata,
2315    },
2316
2317    CycleSource {
2318        cycle_id: CycleId,
2319        metadata: HydroIrMetadata,
2320    },
2321
2322    Tee {
2323        inner: SharedNode,
2324        metadata: HydroIrMetadata,
2325    },
2326
2327    /// A reference materialization point. Wraps a SharedNode so that:
2328    /// - The pipe output delivers data to one consumer
2329    /// - `#var` references can borrow the value from the slot
2330    ///
2331    /// In DFIR codegen, emits `ident = inner_ident -> singleton()` or `-> optional()` or
2332    /// `-> handoff()` depending on `kind`.
2333    ///
2334    /// Uses the same `built_tees` dedup pattern as `Tee`.
2335    Reference {
2336        inner: SharedNode,
2337        kind: crate::handoff_ref::HandoffRefKind,
2338        access_counter: AccessCounter,
2339        metadata: HydroIrMetadata,
2340    },
2341
2342    Partition {
2343        inner: SharedNode,
2344        f: ClosureExpr,
2345        is_true: bool,
2346        metadata: HydroIrMetadata,
2347    },
2348
2349    BeginAtomic {
2350        inner: Box<HydroNode>,
2351        metadata: HydroIrMetadata,
2352    },
2353
2354    EndAtomic {
2355        inner: Box<HydroNode>,
2356        metadata: HydroIrMetadata,
2357    },
2358
2359    Batch {
2360        inner: Box<HydroNode>,
2361        metadata: HydroIrMetadata,
2362    },
2363
2364    YieldConcat {
2365        inner: Box<HydroNode>,
2366        metadata: HydroIrMetadata,
2367    },
2368
2369    Chain {
2370        first: Box<HydroNode>,
2371        second: Box<HydroNode>,
2372        metadata: HydroIrMetadata,
2373    },
2374
2375    MergeOrdered {
2376        first: Box<HydroNode>,
2377        second: Box<HydroNode>,
2378        metadata: HydroIrMetadata,
2379    },
2380
2381    ChainFirst {
2382        first: Box<HydroNode>,
2383        second: Box<HydroNode>,
2384        metadata: HydroIrMetadata,
2385    },
2386
2387    CrossProduct {
2388        left: Box<HydroNode>,
2389        right: Box<HydroNode>,
2390        metadata: HydroIrMetadata,
2391    },
2392
2393    CrossSingleton {
2394        left: Box<HydroNode>,
2395        right: Box<HydroNode>,
2396        metadata: HydroIrMetadata,
2397    },
2398
2399    Join {
2400        left: Box<HydroNode>,
2401        right: Box<HydroNode>,
2402        metadata: HydroIrMetadata,
2403    },
2404
2405    /// Asymmetric join where the right (build) side is bounded.
2406    /// The build side is accumulated (stratum-delayed) into a hash table,
2407    /// then the left (probe) side streams through preserving its ordering.
2408    JoinHalf {
2409        left: Box<HydroNode>,
2410        right: Box<HydroNode>,
2411        metadata: HydroIrMetadata,
2412    },
2413
2414    Difference {
2415        pos: Box<HydroNode>,
2416        neg: Box<HydroNode>,
2417        metadata: HydroIrMetadata,
2418    },
2419
2420    AntiJoin {
2421        pos: Box<HydroNode>,
2422        neg: Box<HydroNode>,
2423        metadata: HydroIrMetadata,
2424    },
2425
2426    ResolveFutures {
2427        input: Box<HydroNode>,
2428        metadata: HydroIrMetadata,
2429    },
2430    ResolveFuturesBlocking {
2431        input: Box<HydroNode>,
2432        metadata: HydroIrMetadata,
2433    },
2434    ResolveFuturesOrdered {
2435        input: Box<HydroNode>,
2436        metadata: HydroIrMetadata,
2437    },
2438
2439    Map {
2440        f: ClosureExpr,
2441        input: Box<HydroNode>,
2442        metadata: HydroIrMetadata,
2443    },
2444    FlatMap {
2445        f: ClosureExpr,
2446        input: Box<HydroNode>,
2447        metadata: HydroIrMetadata,
2448    },
2449    FlatMapStreamBlocking {
2450        f: ClosureExpr,
2451        input: Box<HydroNode>,
2452        metadata: HydroIrMetadata,
2453    },
2454    Filter {
2455        f: ClosureExpr,
2456        input: Box<HydroNode>,
2457        metadata: HydroIrMetadata,
2458    },
2459    FilterMap {
2460        f: ClosureExpr,
2461        input: Box<HydroNode>,
2462        metadata: HydroIrMetadata,
2463    },
2464
2465    DeferTick {
2466        input: Box<HydroNode>,
2467        metadata: HydroIrMetadata,
2468    },
2469    Enumerate {
2470        input: Box<HydroNode>,
2471        metadata: HydroIrMetadata,
2472    },
2473    Inspect {
2474        f: ClosureExpr,
2475        input: Box<HydroNode>,
2476        metadata: HydroIrMetadata,
2477    },
2478
2479    Unique {
2480        input: Box<HydroNode>,
2481        metadata: HydroIrMetadata,
2482    },
2483
2484    Sort {
2485        input: Box<HydroNode>,
2486        metadata: HydroIrMetadata,
2487    },
2488    Fold {
2489        init: ClosureExpr,
2490        acc: ClosureExpr,
2491        input: Box<HydroNode>,
2492        metadata: HydroIrMetadata,
2493    },
2494
2495    Scan {
2496        init: ClosureExpr,
2497        acc: ClosureExpr,
2498        input: Box<HydroNode>,
2499        metadata: HydroIrMetadata,
2500    },
2501    ScanAsyncBlocking {
2502        init: ClosureExpr,
2503        acc: ClosureExpr,
2504        input: Box<HydroNode>,
2505        metadata: HydroIrMetadata,
2506    },
2507    FoldKeyed {
2508        init: ClosureExpr,
2509        acc: ClosureExpr,
2510        input: Box<HydroNode>,
2511        metadata: HydroIrMetadata,
2512    },
2513
2514    Reduce {
2515        f: ClosureExpr,
2516        input: Box<HydroNode>,
2517        metadata: HydroIrMetadata,
2518    },
2519    ReduceKeyed {
2520        f: ClosureExpr,
2521        input: Box<HydroNode>,
2522        metadata: HydroIrMetadata,
2523    },
2524    ReduceKeyedWatermark {
2525        f: ClosureExpr,
2526        input: Box<HydroNode>,
2527        watermark: Box<HydroNode>,
2528        metadata: HydroIrMetadata,
2529    },
2530
2531    Network {
2532        name: Option<String>,
2533        networking_info: crate::networking::NetworkingInfo,
2534        serialize_fn: Option<DebugExpr>,
2535        instantiate_fn: DebugInstantiate,
2536        deserialize_fn: Option<DebugExpr>,
2537        input: Box<HydroNode>,
2538        metadata: HydroIrMetadata,
2539    },
2540
2541    ExternalInput {
2542        from_external_key: LocationKey,
2543        from_port_id: ExternalPortId,
2544        from_many: bool,
2545        codec_type: DebugType,
2546        #[serde(skip)]
2547        port_hint: NetworkHint,
2548        instantiate_fn: DebugInstantiate,
2549        deserialize_fn: Option<DebugExpr>,
2550        metadata: HydroIrMetadata,
2551    },
2552
2553    Counter {
2554        tag: String,
2555        duration: DebugExpr,
2556        prefix: String,
2557        input: Box<HydroNode>,
2558        metadata: HydroIrMetadata,
2559    },
2560
2561    AssertIsConsistent {
2562        inner: Box<HydroNode>,
2563        trusted: bool,
2564        metadata: HydroIrMetadata,
2565    },
2566
2567    UnboundSingleton {
2568        inner: Box<HydroNode>,
2569        metadata: HydroIrMetadata,
2570    },
2571}
2572
2573pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2574pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2575
2576impl HydroNode {
2577    pub fn transform_bottom_up(
2578        &mut self,
2579        transform: &mut impl FnMut(&mut HydroNode),
2580        seen_tees: &mut SeenSharedNodes,
2581        check_well_formed: bool,
2582    ) {
2583        self.transform_children(
2584            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2585            seen_tees,
2586        );
2587
2588        transform(self);
2589
2590        let self_location = self.metadata().location_id.root();
2591
2592        if check_well_formed {
2593            match &*self {
2594                HydroNode::Network { .. } => {}
2595                _ => {
2596                    self.input_metadata().iter().for_each(|i| {
2597                        if i.location_id.root() != self_location {
2598                            panic!(
2599                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2600                                i,
2601                                i.location_id.root(),
2602                                self,
2603                                self_location
2604                            )
2605                        }
2606                    });
2607                }
2608            }
2609        }
2610    }
2611
2612    #[inline(always)]
2613    pub fn transform_children(
2614        &mut self,
2615        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2616        seen_tees: &mut SeenSharedNodes,
2617    ) {
2618        match self {
2619            HydroNode::Placeholder => {
2620                panic!();
2621            }
2622
2623            HydroNode::Source { .. }
2624            | HydroNode::SingletonSource { .. }
2625            | HydroNode::CycleSource { .. }
2626            | HydroNode::ExternalInput { .. } => {}
2627
2628            HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2629                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2630                    *inner = SharedNode(transformed.clone());
2631                } else {
2632                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2633                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2634                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2635                    transform(&mut orig, seen_tees);
2636                    *transformed_cell.borrow_mut() = orig;
2637                    *inner = SharedNode(transformed_cell);
2638                }
2639            }
2640
2641            HydroNode::Partition { inner, f, .. } => {
2642                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2643                    *inner = SharedNode(transformed.clone());
2644                } else {
2645                    f.transform_children(&mut transform, seen_tees);
2646                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2647                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2648                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2649                    transform(&mut orig, seen_tees);
2650                    *transformed_cell.borrow_mut() = orig;
2651                    *inner = SharedNode(transformed_cell);
2652                }
2653            }
2654
2655            HydroNode::Cast { inner, .. }
2656            | HydroNode::ObserveNonDet { inner, .. }
2657            | HydroNode::BeginAtomic { inner, .. }
2658            | HydroNode::EndAtomic { inner, .. }
2659            | HydroNode::Batch { inner, .. }
2660            | HydroNode::YieldConcat { inner, .. }
2661            | HydroNode::UnboundSingleton { inner, .. }
2662            | HydroNode::AssertIsConsistent { inner, .. } => {
2663                transform(inner.as_mut(), seen_tees);
2664            }
2665
2666            HydroNode::Chain { first, second, .. } => {
2667                transform(first.as_mut(), seen_tees);
2668                transform(second.as_mut(), seen_tees);
2669            }
2670
2671            HydroNode::MergeOrdered { first, second, .. } => {
2672                transform(first.as_mut(), seen_tees);
2673                transform(second.as_mut(), seen_tees);
2674            }
2675
2676            HydroNode::ChainFirst { first, second, .. } => {
2677                transform(first.as_mut(), seen_tees);
2678                transform(second.as_mut(), seen_tees);
2679            }
2680
2681            HydroNode::CrossSingleton { left, right, .. }
2682            | HydroNode::CrossProduct { left, right, .. }
2683            | HydroNode::Join { left, right, .. }
2684            | HydroNode::JoinHalf { left, right, .. } => {
2685                transform(left.as_mut(), seen_tees);
2686                transform(right.as_mut(), seen_tees);
2687            }
2688
2689            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2690                transform(pos.as_mut(), seen_tees);
2691                transform(neg.as_mut(), seen_tees);
2692            }
2693
2694            HydroNode::Map { f, input, .. } => {
2695                f.transform_children(&mut transform, seen_tees);
2696                transform(input.as_mut(), seen_tees);
2697            }
2698            HydroNode::FlatMap { f, input, .. }
2699            | HydroNode::FlatMapStreamBlocking { f, input, .. }
2700            | HydroNode::Filter { f, input, .. }
2701            | HydroNode::FilterMap { f, input, .. }
2702            | HydroNode::Inspect { f, input, .. }
2703            | HydroNode::Reduce { f, input, .. }
2704            | HydroNode::ReduceKeyed { f, input, .. } => {
2705                f.transform_children(&mut transform, seen_tees);
2706                transform(input.as_mut(), seen_tees);
2707            }
2708            HydroNode::ReduceKeyedWatermark {
2709                f,
2710                input,
2711                watermark,
2712                ..
2713            } => {
2714                f.transform_children(&mut transform, seen_tees);
2715                transform(input.as_mut(), seen_tees);
2716                transform(watermark.as_mut(), seen_tees);
2717            }
2718            HydroNode::Fold {
2719                init, acc, input, ..
2720            }
2721            | HydroNode::Scan {
2722                init, acc, input, ..
2723            }
2724            | HydroNode::ScanAsyncBlocking {
2725                init, acc, input, ..
2726            }
2727            | HydroNode::FoldKeyed {
2728                init, acc, input, ..
2729            } => {
2730                init.transform_children(&mut transform, seen_tees);
2731                acc.transform_children(&mut transform, seen_tees);
2732                transform(input.as_mut(), seen_tees);
2733            }
2734            HydroNode::ResolveFutures { input, .. }
2735            | HydroNode::ResolveFuturesBlocking { input, .. }
2736            | HydroNode::ResolveFuturesOrdered { input, .. }
2737            | HydroNode::Sort { input, .. }
2738            | HydroNode::DeferTick { input, .. }
2739            | HydroNode::Enumerate { input, .. }
2740            | HydroNode::Unique { input, .. }
2741            | HydroNode::Network { input, .. }
2742            | HydroNode::Counter { input, .. } => {
2743                transform(input.as_mut(), seen_tees);
2744            }
2745        }
2746    }
2747
2748    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2749        match self {
2750            HydroNode::Placeholder => HydroNode::Placeholder,
2751            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2752                inner: Box::new(inner.deep_clone(seen_tees)),
2753                metadata: metadata.clone(),
2754            },
2755            HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2756                inner: Box::new(inner.deep_clone(seen_tees)),
2757                metadata: metadata.clone(),
2758            },
2759            HydroNode::ObserveNonDet {
2760                inner,
2761                trusted,
2762                metadata,
2763            } => HydroNode::ObserveNonDet {
2764                inner: Box::new(inner.deep_clone(seen_tees)),
2765                trusted: *trusted,
2766                metadata: metadata.clone(),
2767            },
2768            HydroNode::AssertIsConsistent {
2769                inner,
2770                trusted,
2771                metadata,
2772            } => HydroNode::AssertIsConsistent {
2773                inner: Box::new(inner.deep_clone(seen_tees)),
2774                trusted: *trusted,
2775                metadata: metadata.clone(),
2776            },
2777            HydroNode::Source { source, metadata } => HydroNode::Source {
2778                source: source.clone(),
2779                metadata: metadata.clone(),
2780            },
2781            HydroNode::SingletonSource {
2782                value,
2783                first_tick_only,
2784                metadata,
2785            } => HydroNode::SingletonSource {
2786                value: value.clone(),
2787                first_tick_only: *first_tick_only,
2788                metadata: metadata.clone(),
2789            },
2790            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2791                cycle_id: *cycle_id,
2792                metadata: metadata.clone(),
2793            },
2794            HydroNode::Tee { inner, metadata }
2795            | HydroNode::Reference {
2796                inner, metadata, ..
2797            } => {
2798                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2799                    SharedNode(transformed.clone())
2800                } else {
2801                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2802                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2803                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2804                    *new_rc.borrow_mut() = cloned;
2805                    SharedNode(new_rc)
2806                };
2807                if let HydroNode::Reference {
2808                    kind,
2809                    access_counter,
2810                    ..
2811                } = self
2812                {
2813                    HydroNode::Reference {
2814                        inner: cloned_inner,
2815                        kind: *kind,
2816                        access_counter: access_counter.freeze(),
2817                        metadata: metadata.clone(),
2818                    }
2819                } else {
2820                    HydroNode::Tee {
2821                        inner: cloned_inner,
2822                        metadata: metadata.clone(),
2823                    }
2824                }
2825            }
2826            HydroNode::Partition {
2827                inner,
2828                f,
2829                is_true,
2830                metadata,
2831            } => {
2832                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2833                    HydroNode::Partition {
2834                        inner: SharedNode(transformed.clone()),
2835                        f: f.deep_clone(seen_tees),
2836                        is_true: *is_true,
2837                        metadata: metadata.clone(),
2838                    }
2839                } else {
2840                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2841                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2842                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2843                    *new_rc.borrow_mut() = cloned;
2844                    HydroNode::Partition {
2845                        inner: SharedNode(new_rc),
2846                        f: f.deep_clone(seen_tees),
2847                        is_true: *is_true,
2848                        metadata: metadata.clone(),
2849                    }
2850                }
2851            }
2852            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2853                inner: Box::new(inner.deep_clone(seen_tees)),
2854                metadata: metadata.clone(),
2855            },
2856            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2857                inner: Box::new(inner.deep_clone(seen_tees)),
2858                metadata: metadata.clone(),
2859            },
2860            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2861                inner: Box::new(inner.deep_clone(seen_tees)),
2862                metadata: metadata.clone(),
2863            },
2864            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2865                inner: Box::new(inner.deep_clone(seen_tees)),
2866                metadata: metadata.clone(),
2867            },
2868            HydroNode::Chain {
2869                first,
2870                second,
2871                metadata,
2872            } => HydroNode::Chain {
2873                first: Box::new(first.deep_clone(seen_tees)),
2874                second: Box::new(second.deep_clone(seen_tees)),
2875                metadata: metadata.clone(),
2876            },
2877            HydroNode::MergeOrdered {
2878                first,
2879                second,
2880                metadata,
2881            } => HydroNode::MergeOrdered {
2882                first: Box::new(first.deep_clone(seen_tees)),
2883                second: Box::new(second.deep_clone(seen_tees)),
2884                metadata: metadata.clone(),
2885            },
2886            HydroNode::ChainFirst {
2887                first,
2888                second,
2889                metadata,
2890            } => HydroNode::ChainFirst {
2891                first: Box::new(first.deep_clone(seen_tees)),
2892                second: Box::new(second.deep_clone(seen_tees)),
2893                metadata: metadata.clone(),
2894            },
2895            HydroNode::CrossProduct {
2896                left,
2897                right,
2898                metadata,
2899            } => HydroNode::CrossProduct {
2900                left: Box::new(left.deep_clone(seen_tees)),
2901                right: Box::new(right.deep_clone(seen_tees)),
2902                metadata: metadata.clone(),
2903            },
2904            HydroNode::CrossSingleton {
2905                left,
2906                right,
2907                metadata,
2908            } => HydroNode::CrossSingleton {
2909                left: Box::new(left.deep_clone(seen_tees)),
2910                right: Box::new(right.deep_clone(seen_tees)),
2911                metadata: metadata.clone(),
2912            },
2913            HydroNode::Join {
2914                left,
2915                right,
2916                metadata,
2917            } => HydroNode::Join {
2918                left: Box::new(left.deep_clone(seen_tees)),
2919                right: Box::new(right.deep_clone(seen_tees)),
2920                metadata: metadata.clone(),
2921            },
2922            HydroNode::JoinHalf {
2923                left,
2924                right,
2925                metadata,
2926            } => HydroNode::JoinHalf {
2927                left: Box::new(left.deep_clone(seen_tees)),
2928                right: Box::new(right.deep_clone(seen_tees)),
2929                metadata: metadata.clone(),
2930            },
2931            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2932                pos: Box::new(pos.deep_clone(seen_tees)),
2933                neg: Box::new(neg.deep_clone(seen_tees)),
2934                metadata: metadata.clone(),
2935            },
2936            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2937                pos: Box::new(pos.deep_clone(seen_tees)),
2938                neg: Box::new(neg.deep_clone(seen_tees)),
2939                metadata: metadata.clone(),
2940            },
2941            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2942                input: Box::new(input.deep_clone(seen_tees)),
2943                metadata: metadata.clone(),
2944            },
2945            HydroNode::ResolveFuturesBlocking { input, metadata } => {
2946                HydroNode::ResolveFuturesBlocking {
2947                    input: Box::new(input.deep_clone(seen_tees)),
2948                    metadata: metadata.clone(),
2949                }
2950            }
2951            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2952                HydroNode::ResolveFuturesOrdered {
2953                    input: Box::new(input.deep_clone(seen_tees)),
2954                    metadata: metadata.clone(),
2955                }
2956            }
2957            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2958                f: f.deep_clone(seen_tees),
2959                input: Box::new(input.deep_clone(seen_tees)),
2960                metadata: metadata.clone(),
2961            },
2962            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2963                f: f.deep_clone(seen_tees),
2964                input: Box::new(input.deep_clone(seen_tees)),
2965                metadata: metadata.clone(),
2966            },
2967            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2968                HydroNode::FlatMapStreamBlocking {
2969                    f: f.deep_clone(seen_tees),
2970                    input: Box::new(input.deep_clone(seen_tees)),
2971                    metadata: metadata.clone(),
2972                }
2973            }
2974            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2975                f: f.deep_clone(seen_tees),
2976                input: Box::new(input.deep_clone(seen_tees)),
2977                metadata: metadata.clone(),
2978            },
2979            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2980                f: f.deep_clone(seen_tees),
2981                input: Box::new(input.deep_clone(seen_tees)),
2982                metadata: metadata.clone(),
2983            },
2984            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2985                input: Box::new(input.deep_clone(seen_tees)),
2986                metadata: metadata.clone(),
2987            },
2988            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2989                input: Box::new(input.deep_clone(seen_tees)),
2990                metadata: metadata.clone(),
2991            },
2992            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2993                f: f.deep_clone(seen_tees),
2994                input: Box::new(input.deep_clone(seen_tees)),
2995                metadata: metadata.clone(),
2996            },
2997            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2998                input: Box::new(input.deep_clone(seen_tees)),
2999                metadata: metadata.clone(),
3000            },
3001            HydroNode::Sort { input, metadata } => HydroNode::Sort {
3002                input: Box::new(input.deep_clone(seen_tees)),
3003                metadata: metadata.clone(),
3004            },
3005            HydroNode::Fold {
3006                init,
3007                acc,
3008                input,
3009                metadata,
3010            } => HydroNode::Fold {
3011                init: init.deep_clone(seen_tees),
3012                acc: acc.deep_clone(seen_tees),
3013                input: Box::new(input.deep_clone(seen_tees)),
3014                metadata: metadata.clone(),
3015            },
3016            HydroNode::Scan {
3017                init,
3018                acc,
3019                input,
3020                metadata,
3021            } => HydroNode::Scan {
3022                init: init.deep_clone(seen_tees),
3023                acc: acc.deep_clone(seen_tees),
3024                input: Box::new(input.deep_clone(seen_tees)),
3025                metadata: metadata.clone(),
3026            },
3027            HydroNode::ScanAsyncBlocking {
3028                init,
3029                acc,
3030                input,
3031                metadata,
3032            } => HydroNode::ScanAsyncBlocking {
3033                init: init.deep_clone(seen_tees),
3034                acc: acc.deep_clone(seen_tees),
3035                input: Box::new(input.deep_clone(seen_tees)),
3036                metadata: metadata.clone(),
3037            },
3038            HydroNode::FoldKeyed {
3039                init,
3040                acc,
3041                input,
3042                metadata,
3043            } => HydroNode::FoldKeyed {
3044                init: init.deep_clone(seen_tees),
3045                acc: acc.deep_clone(seen_tees),
3046                input: Box::new(input.deep_clone(seen_tees)),
3047                metadata: metadata.clone(),
3048            },
3049            HydroNode::ReduceKeyedWatermark {
3050                f,
3051                input,
3052                watermark,
3053                metadata,
3054            } => HydroNode::ReduceKeyedWatermark {
3055                f: f.deep_clone(seen_tees),
3056                input: Box::new(input.deep_clone(seen_tees)),
3057                watermark: Box::new(watermark.deep_clone(seen_tees)),
3058                metadata: metadata.clone(),
3059            },
3060            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3061                f: f.deep_clone(seen_tees),
3062                input: Box::new(input.deep_clone(seen_tees)),
3063                metadata: metadata.clone(),
3064            },
3065            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3066                f: f.deep_clone(seen_tees),
3067                input: Box::new(input.deep_clone(seen_tees)),
3068                metadata: metadata.clone(),
3069            },
3070            HydroNode::Network {
3071                name,
3072                networking_info,
3073                serialize_fn,
3074                instantiate_fn,
3075                deserialize_fn,
3076                input,
3077                metadata,
3078            } => HydroNode::Network {
3079                name: name.clone(),
3080                networking_info: networking_info.clone(),
3081                serialize_fn: serialize_fn.clone(),
3082                instantiate_fn: instantiate_fn.clone(),
3083                deserialize_fn: deserialize_fn.clone(),
3084                input: Box::new(input.deep_clone(seen_tees)),
3085                metadata: metadata.clone(),
3086            },
3087            HydroNode::ExternalInput {
3088                from_external_key,
3089                from_port_id,
3090                from_many,
3091                codec_type,
3092                port_hint,
3093                instantiate_fn,
3094                deserialize_fn,
3095                metadata,
3096            } => HydroNode::ExternalInput {
3097                from_external_key: *from_external_key,
3098                from_port_id: *from_port_id,
3099                from_many: *from_many,
3100                codec_type: codec_type.clone(),
3101                port_hint: *port_hint,
3102                instantiate_fn: instantiate_fn.clone(),
3103                deserialize_fn: deserialize_fn.clone(),
3104                metadata: metadata.clone(),
3105            },
3106            HydroNode::Counter {
3107                tag,
3108                duration,
3109                prefix,
3110                input,
3111                metadata,
3112            } => HydroNode::Counter {
3113                tag: tag.clone(),
3114                duration: duration.clone(),
3115                prefix: prefix.clone(),
3116                input: Box::new(input.deep_clone(seen_tees)),
3117                metadata: metadata.clone(),
3118            },
3119        }
3120    }
3121
3122    #[cfg(feature = "build")]
3123    pub fn emit_core(
3124        &mut self,
3125        builders_or_callback: &mut BuildersOrCallback<
3126            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3127            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3128        >,
3129        seen_tees: &mut SeenSharedNodes,
3130        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3131        next_stmt_id: &mut crate::Counter<StmtId>,
3132        fold_hooked_idents: &mut HashSet<String>,
3133    ) -> syn::Ident {
3134        let mut ident_stack: Vec<syn::Ident> = Vec::new();
3135
3136        self.transform_bottom_up(
3137            &mut |node: &mut HydroNode| {
3138                let out_location = node.metadata().location_id.clone();
3139                match node {
3140                    HydroNode::Placeholder => {
3141                        panic!()
3142                    }
3143
3144                    HydroNode::Cast { .. } => {
3145                        // Cast passes through the input ident unchanged
3146                        // The input ident is already on the stack from processing the child
3147                        let _ = next_stmt_id.get_and_increment();
3148                        match builders_or_callback {
3149                            BuildersOrCallback::Builders(_) => {}
3150                            BuildersOrCallback::Callback(_, node_callback) => {
3151                                node_callback(node, next_stmt_id);
3152                            }
3153                        }
3154                        // input_ident stays on stack as output
3155                    }
3156
3157                    HydroNode::UnboundSingleton { .. } => {
3158                        let inner_ident = ident_stack.pop().unwrap();
3159
3160                        let stmt_id = next_stmt_id.get_and_increment();
3161                        let out_ident =
3162                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3163
3164                        match builders_or_callback {
3165                            BuildersOrCallback::Builders(graph_builders) => {
3166                                if graph_builders.singleton_intermediates() {
3167                                    let builder = graph_builders.get_dfir_mut(&out_location);
3168                                    builder.add_dfir(
3169                                        parse_quote! {
3170                                            #out_ident = #inner_ident;
3171                                        },
3172                                        None,
3173                                        None,
3174                                    );
3175                                } else {
3176                                    let builder = graph_builders.get_dfir_mut(&out_location);
3177                                    builder.add_dfir(
3178                                        parse_quote! {
3179                                            #out_ident = #inner_ident -> persist::<'static>();
3180                                        },
3181                                        None,
3182                                        None,
3183                                    );
3184                                }
3185                            }
3186                            BuildersOrCallback::Callback(_, node_callback) => {
3187                                node_callback(node, next_stmt_id);
3188                            }
3189                        }
3190
3191                        ident_stack.push(out_ident);
3192                    }
3193
3194                    HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3195                        let inner_ident = ident_stack.pop().unwrap();
3196
3197                        let stmt_id = next_stmt_id.get_and_increment();
3198                        let out_ident =
3199                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3200
3201                        match builders_or_callback {
3202                            BuildersOrCallback::Builders(graph_builders) => {
3203                                graph_builders.assert_is_consistent(
3204                                    *trusted,
3205                                    &inner.metadata().location_id,
3206                                    inner_ident,
3207                                    &out_ident,
3208                                );
3209                            }
3210                            BuildersOrCallback::Callback(_, node_callback) => {
3211                                node_callback(node, next_stmt_id);
3212                            }
3213                        }
3214
3215                        ident_stack.push(out_ident);
3216                    }
3217
3218                    HydroNode::ObserveNonDet {
3219                        inner,
3220                        trusted,
3221                        metadata,
3222                        ..
3223                    } => {
3224                        let inner_ident = ident_stack.pop().unwrap();
3225
3226                        let stmt_id = next_stmt_id.get_and_increment();
3227                        let observe_ident =
3228                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3229
3230                        match builders_or_callback {
3231                            BuildersOrCallback::Builders(graph_builders) => {
3232                                graph_builders.observe_nondet(
3233                                    *trusted,
3234                                    &inner.metadata().location_id,
3235                                    inner_ident,
3236                                    &inner.metadata().collection_kind,
3237                                    &observe_ident,
3238                                    &metadata.collection_kind,
3239                                    &metadata.op,
3240                                );
3241                            }
3242                            BuildersOrCallback::Callback(_, node_callback) => {
3243                                node_callback(node, next_stmt_id);
3244                            }
3245                        }
3246
3247                        ident_stack.push(observe_ident);
3248                    }
3249
3250                    HydroNode::Batch {
3251                        inner, metadata, ..
3252                    } => {
3253                        let inner_ident = ident_stack.pop().unwrap();
3254
3255                        let stmt_id = next_stmt_id.get_and_increment();
3256                        let batch_ident =
3257                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3258
3259                        match builders_or_callback {
3260                            BuildersOrCallback::Builders(graph_builders) => {
3261                                graph_builders.batch(
3262                                    inner_ident,
3263                                    &inner.metadata().location_id,
3264                                    &inner.metadata().collection_kind,
3265                                    &batch_ident,
3266                                    &out_location,
3267                                    &metadata.op,
3268                                    fold_hooked_idents,
3269                                );
3270                            }
3271                            BuildersOrCallback::Callback(_, node_callback) => {
3272                                node_callback(node, next_stmt_id);
3273                            }
3274                        }
3275
3276                        ident_stack.push(batch_ident);
3277                    }
3278
3279                    HydroNode::YieldConcat { inner, .. } => {
3280                        let inner_ident = ident_stack.pop().unwrap();
3281
3282                        let stmt_id = next_stmt_id.get_and_increment();
3283                        let yield_ident =
3284                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3285
3286                        match builders_or_callback {
3287                            BuildersOrCallback::Builders(graph_builders) => {
3288                                graph_builders.yield_from_tick(
3289                                    inner_ident,
3290                                    &inner.metadata().location_id,
3291                                    &inner.metadata().collection_kind,
3292                                    &yield_ident,
3293                                    &out_location,
3294                                );
3295                            }
3296                            BuildersOrCallback::Callback(_, node_callback) => {
3297                                node_callback(node, next_stmt_id);
3298                            }
3299                        }
3300
3301                        ident_stack.push(yield_ident);
3302                    }
3303
3304                    HydroNode::BeginAtomic { inner, metadata } => {
3305                        let inner_ident = ident_stack.pop().unwrap();
3306
3307                        let stmt_id = next_stmt_id.get_and_increment();
3308                        let begin_ident =
3309                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3310
3311                        match builders_or_callback {
3312                            BuildersOrCallback::Builders(graph_builders) => {
3313                                graph_builders.begin_atomic(
3314                                    inner_ident,
3315                                    &inner.metadata().location_id,
3316                                    &inner.metadata().collection_kind,
3317                                    &begin_ident,
3318                                    &out_location,
3319                                    &metadata.op,
3320                                );
3321                            }
3322                            BuildersOrCallback::Callback(_, node_callback) => {
3323                                node_callback(node, next_stmt_id);
3324                            }
3325                        }
3326
3327                        ident_stack.push(begin_ident);
3328                    }
3329
3330                    HydroNode::EndAtomic { inner, .. } => {
3331                        let inner_ident = ident_stack.pop().unwrap();
3332
3333                        let stmt_id = next_stmt_id.get_and_increment();
3334                        let end_ident =
3335                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3336
3337                        match builders_or_callback {
3338                            BuildersOrCallback::Builders(graph_builders) => {
3339                                graph_builders.end_atomic(
3340                                    inner_ident,
3341                                    &inner.metadata().location_id,
3342                                    &inner.metadata().collection_kind,
3343                                    &end_ident,
3344                                );
3345                            }
3346                            BuildersOrCallback::Callback(_, node_callback) => {
3347                                node_callback(node, next_stmt_id);
3348                            }
3349                        }
3350
3351                        ident_stack.push(end_ident);
3352                    }
3353
3354                    HydroNode::Source {
3355                        source, metadata, ..
3356                    } => {
3357                        if let HydroSource::ExternalNetwork() = source {
3358                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3359                        } else {
3360                            let stmt_id = next_stmt_id.get_and_increment();
3361                            let source_ident =
3362                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3363
3364                            let source_stmt = match source {
3365                                HydroSource::Stream(expr) => {
3366                                    debug_assert!(metadata.location_id.is_top_level());
3367                                    parse_quote! {
3368                                        #source_ident = source_stream(#expr);
3369                                    }
3370                                }
3371
3372                                HydroSource::ExternalNetwork() => {
3373                                    unreachable!()
3374                                }
3375
3376                                HydroSource::Iter(expr) => {
3377                                    if metadata.location_id.is_top_level() {
3378                                        parse_quote! {
3379                                            #source_ident = source_iter(#expr);
3380                                        }
3381                                    } else {
3382                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3383                                        parse_quote! {
3384                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3385                                        }
3386                                    }
3387                                }
3388
3389                                HydroSource::Spin() => {
3390                                    debug_assert!(metadata.location_id.is_top_level());
3391                                    parse_quote! {
3392                                        #source_ident = spin();
3393                                    }
3394                                }
3395
3396                                HydroSource::ClusterMembers(target_loc, state) => {
3397                                    debug_assert!(metadata.location_id.is_top_level());
3398
3399                                    let members_tee_ident = syn::Ident::new(
3400                                        &format!(
3401                                            "__cluster_members_tee_{}_{}",
3402                                            metadata.location_id.root().key(),
3403                                            target_loc.key(),
3404                                        ),
3405                                        Span::call_site(),
3406                                    );
3407
3408                                    match state {
3409                                        ClusterMembersState::Stream(d) => {
3410                                            parse_quote! {
3411                                                #members_tee_ident = source_stream(#d) -> tee();
3412                                                #source_ident = #members_tee_ident;
3413                                            }
3414                                        },
3415                                        ClusterMembersState::Uninit => syn::parse_quote! {
3416                                            #source_ident = source_stream(DUMMY);
3417                                        },
3418                                        ClusterMembersState::Tee(..) => parse_quote! {
3419                                            #source_ident = #members_tee_ident;
3420                                        },
3421                                    }
3422                                }
3423
3424                                HydroSource::Embedded(ident) => {
3425                                    parse_quote! {
3426                                        #source_ident = source_stream(#ident);
3427                                    }
3428                                }
3429
3430                                HydroSource::EmbeddedSingleton(ident) => {
3431                                    parse_quote! {
3432                                        #source_ident = source_iter([#ident]);
3433                                    }
3434                                }
3435                            };
3436
3437                            match builders_or_callback {
3438                                BuildersOrCallback::Builders(graph_builders) => {
3439                                    let builder = graph_builders.get_dfir_mut(&out_location);
3440                                    builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3441                                }
3442                                BuildersOrCallback::Callback(_, node_callback) => {
3443                                    node_callback(node, next_stmt_id);
3444                                }
3445                            }
3446
3447                            ident_stack.push(source_ident);
3448                        }
3449                    }
3450
3451                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3452                        let stmt_id = next_stmt_id.get_and_increment();
3453                        let source_ident =
3454                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3455
3456                        match builders_or_callback {
3457                            BuildersOrCallback::Builders(graph_builders) => {
3458                                let builder = graph_builders.get_dfir_mut(&out_location);
3459
3460                                if *first_tick_only {
3461                                    assert!(
3462                                        !metadata.location_id.is_top_level(),
3463                                        "first_tick_only SingletonSource must be inside a tick"
3464                                    );
3465                                }
3466
3467                                if *first_tick_only
3468                                    || (metadata.location_id.is_top_level()
3469                                        && metadata.collection_kind.is_bounded())
3470                                {
3471                                    builder.add_dfir(
3472                                        parse_quote! {
3473                                            #source_ident = source_iter([#value]);
3474                                        },
3475                                        None,
3476                                        Some(&stmt_id.to_string()),
3477                                    );
3478                                } else {
3479                                    builder.add_dfir(
3480                                        parse_quote! {
3481                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3482                                        },
3483                                        None,
3484                                        Some(&stmt_id.to_string()),
3485                                    );
3486                                }
3487                            }
3488                            BuildersOrCallback::Callback(_, node_callback) => {
3489                                node_callback(node, next_stmt_id);
3490                            }
3491                        }
3492
3493                        ident_stack.push(source_ident);
3494                    }
3495
3496                    HydroNode::CycleSource { cycle_id, .. } => {
3497                        let ident = cycle_id.as_ident();
3498
3499                        // consume a stmt id even though we did not emit anything so that we can instrument this
3500                        let _ = next_stmt_id.get_and_increment();
3501
3502                        match builders_or_callback {
3503                            BuildersOrCallback::Builders(_) => {}
3504                            BuildersOrCallback::Callback(_, node_callback) => {
3505                                node_callback(node, next_stmt_id);
3506                            }
3507                        }
3508
3509                        ident_stack.push(ident);
3510                    }
3511
3512                    HydroNode::Tee { inner, .. } => {
3513                        // we consume a stmt id regardless of if we emit the tee() operator,
3514                        // so that during rewrites we touch all recipients of the tee()
3515                        let stmt_id = next_stmt_id.get_and_increment();
3516
3517                        let ret_ident = if let Some(built_idents) =
3518                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3519                        {
3520                            match builders_or_callback {
3521                                BuildersOrCallback::Builders(_) => {}
3522                                BuildersOrCallback::Callback(_, node_callback) => {
3523                                    node_callback(node, next_stmt_id);
3524                                }
3525                            }
3526
3527                            built_idents[0].clone()
3528                        } else {
3529                            // The inner node was already processed by transform_bottom_up,
3530                            // so its ident is on the stack
3531                            let inner_ident = ident_stack.pop().unwrap();
3532
3533                            let tee_ident =
3534                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3535
3536                            built_tees.insert(
3537                                inner.0.as_ref() as *const RefCell<HydroNode>,
3538                                vec![tee_ident.clone()],
3539                            );
3540
3541                            match builders_or_callback {
3542                                BuildersOrCallback::Builders(graph_builders) => {
3543                                    // NOTE: With `forward_ref`, the fold codegen may not have
3544                                    // run yet when we reach this tee, so `fold_hooked_idents`
3545                                    // might not contain the inner ident. In that case we won't
3546                                    // propagate the "hooked" status to the tee and the
3547                                    // downstream singleton batch will use the normal
3548                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3549                                    // This is not a soundness issue: the fallback hook still
3550                                    // produces correct behavior, just with a redundant decision
3551                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3552                                    // fix ordering so forward_ref folds are always processed
3553                                    // before their downstream tees.
3554                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3555                                        fold_hooked_idents.insert(tee_ident.to_string());
3556                                    }
3557                                    let builder = graph_builders.get_dfir_mut(&out_location);
3558                                    builder.add_dfir(
3559                                        parse_quote! {
3560                                            #tee_ident = #inner_ident -> tee();
3561                                        },
3562                                        None,
3563                                        Some(&stmt_id.to_string()),
3564                                    );
3565                                }
3566                                BuildersOrCallback::Callback(_, node_callback) => {
3567                                    node_callback(node, next_stmt_id);
3568                                }
3569                            }
3570
3571                            tee_ident
3572                        };
3573
3574                        ident_stack.push(ret_ident);
3575                    }
3576
3577                    HydroNode::Reference { inner, kind, .. } => {
3578                        // we consume a stmt id regardless of if we emit the operator,
3579                        // so that during rewrites we touch all recipients
3580                        let stmt_id = next_stmt_id.get_and_increment();
3581
3582                        let ret_ident = if let Some(built_idents) =
3583                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3584                        {
3585                            built_idents[0].clone()
3586                        } else {
3587                            let inner_ident = ident_stack.pop().unwrap();
3588
3589                            let ref_ident =
3590                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3591
3592                            built_tees.insert(
3593                                inner.0.as_ref() as *const RefCell<HydroNode>,
3594                                vec![ref_ident.clone()],
3595                            );
3596
3597                            match builders_or_callback {
3598                                BuildersOrCallback::Builders(graph_builders) => {
3599                                    let builder = graph_builders.get_dfir_mut(&out_location);
3600                                    let op_ident = syn::Ident::new(
3601                                        match kind {
3602                                            crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3603                                            crate::handoff_ref::HandoffRefKind::Optional => "optional",
3604                                            crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3605                                        },
3606                                        Span::call_site(),
3607                                    );
3608                                    builder.add_dfir(
3609                                        parse_quote! {
3610                                            #ref_ident = #inner_ident -> #op_ident();
3611                                        },
3612                                        None,
3613                                        Some(&stmt_id.to_string()),
3614                                    );
3615                                }
3616                                BuildersOrCallback::Callback(_, node_callback) => {
3617                                    node_callback(node, next_stmt_id);
3618                                }
3619                            }
3620
3621                            ref_ident
3622                        };
3623
3624                        ident_stack.push(ret_ident);
3625                    }
3626
3627                    HydroNode::Partition {
3628                        inner, f, is_true, ..
3629                    } => {
3630                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3631                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3632                        let stmt_id = next_stmt_id.get_and_increment();
3633
3634                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3635                            match builders_or_callback {
3636                                BuildersOrCallback::Builders(_) => {}
3637                                BuildersOrCallback::Callback(_, node_callback) => {
3638                                    node_callback(node, next_stmt_id);
3639                                }
3640                            }
3641
3642                            let idx = if is_true { 0 } else { 1 };
3643                            built_idents[idx].clone()
3644                        } else {
3645                            // The inner node was already processed by transform_bottom_up,
3646                            // so its ident is on the stack
3647                            let inner_ident = ident_stack.pop().unwrap();
3648                            let f_tokens = f.emit_tokens(&mut ident_stack);
3649
3650                            let partition_ident = syn::Ident::new(
3651                                &format!("stream_{}_partition", stmt_id),
3652                                Span::call_site(),
3653                            );
3654                            let true_ident = syn::Ident::new(
3655                                &format!("stream_{}_true", stmt_id),
3656                                Span::call_site(),
3657                            );
3658                            let false_ident = syn::Ident::new(
3659                                &format!("stream_{}_false", stmt_id),
3660                                Span::call_site(),
3661                            );
3662
3663                            built_tees.insert(
3664                                ptr,
3665                                vec![true_ident.clone(), false_ident.clone()],
3666                            );
3667
3668                            let stmt_id = next_stmt_id.get_and_increment();
3669                            match builders_or_callback {
3670                                BuildersOrCallback::Builders(graph_builders) => {
3671                                    let builder = graph_builders.get_dfir_mut(&out_location);
3672                                    builder.add_dfir(
3673                                        parse_quote! {
3674                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3675                                            #true_ident = #partition_ident[0];
3676                                            #false_ident = #partition_ident[1];
3677                                        },
3678                                        None,
3679                                        Some(&stmt_id.to_string()),
3680                                    );
3681                                }
3682                                BuildersOrCallback::Callback(_, node_callback) => {
3683                                    node_callback(node, next_stmt_id);
3684                                }
3685                            }
3686
3687                            if is_true { true_ident } else { false_ident }
3688                        };
3689
3690                        ident_stack.push(ret_ident);
3691                    }
3692
3693                    HydroNode::Chain { .. } => {
3694                        // Children are processed left-to-right, so second is on top
3695                        let second_ident = ident_stack.pop().unwrap();
3696                        let first_ident = ident_stack.pop().unwrap();
3697
3698                        let stmt_id = next_stmt_id.get_and_increment();
3699                        let chain_ident =
3700                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3701
3702                        match builders_or_callback {
3703                            BuildersOrCallback::Builders(graph_builders) => {
3704                                let builder = graph_builders.get_dfir_mut(&out_location);
3705                                builder.add_dfir(
3706                                    parse_quote! {
3707                                        #chain_ident = chain();
3708                                        #first_ident -> [0]#chain_ident;
3709                                        #second_ident -> [1]#chain_ident;
3710                                    },
3711                                    None,
3712                                    Some(&stmt_id.to_string()),
3713                                );
3714                            }
3715                            BuildersOrCallback::Callback(_, node_callback) => {
3716                                node_callback(node, next_stmt_id);
3717                            }
3718                        }
3719
3720                        ident_stack.push(chain_ident);
3721                    }
3722
3723                    HydroNode::MergeOrdered { first, metadata, .. } => {
3724                        let second_ident = ident_stack.pop().unwrap();
3725                        let first_ident = ident_stack.pop().unwrap();
3726
3727                        let stmt_id = next_stmt_id.get_and_increment();
3728                        let merge_ident =
3729                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3730
3731                        match builders_or_callback {
3732                            BuildersOrCallback::Builders(graph_builders) => {
3733                                graph_builders.merge_ordered(
3734                                    &first.metadata().location_id,
3735                                    first_ident,
3736                                    second_ident,
3737                                    &merge_ident,
3738                                    &first.metadata().collection_kind,
3739                                    &metadata.op,
3740                                    Some(&stmt_id.to_string()),
3741                                );
3742                            }
3743                            BuildersOrCallback::Callback(_, node_callback) => {
3744                                node_callback(node, next_stmt_id);
3745                            }
3746                        }
3747
3748                        ident_stack.push(merge_ident);
3749                    }
3750
3751                    HydroNode::ChainFirst { .. } => {
3752                        let second_ident = ident_stack.pop().unwrap();
3753                        let first_ident = ident_stack.pop().unwrap();
3754
3755                        let stmt_id = next_stmt_id.get_and_increment();
3756                        let chain_ident =
3757                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3758
3759                        match builders_or_callback {
3760                            BuildersOrCallback::Builders(graph_builders) => {
3761                                let builder = graph_builders.get_dfir_mut(&out_location);
3762                                builder.add_dfir(
3763                                    parse_quote! {
3764                                        #chain_ident = chain_first_n(1);
3765                                        #first_ident -> [0]#chain_ident;
3766                                        #second_ident -> [1]#chain_ident;
3767                                    },
3768                                    None,
3769                                    Some(&stmt_id.to_string()),
3770                                );
3771                            }
3772                            BuildersOrCallback::Callback(_, node_callback) => {
3773                                node_callback(node, next_stmt_id);
3774                            }
3775                        }
3776
3777                        ident_stack.push(chain_ident);
3778                    }
3779
3780                    HydroNode::CrossSingleton { right, .. } => {
3781                        let right_ident = ident_stack.pop().unwrap();
3782                        let left_ident = ident_stack.pop().unwrap();
3783
3784                        let stmt_id = next_stmt_id.get_and_increment();
3785                        let cross_ident =
3786                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3787
3788                        match builders_or_callback {
3789                            BuildersOrCallback::Builders(graph_builders) => {
3790                                let builder = graph_builders.get_dfir_mut(&out_location);
3791
3792                                if right.metadata().location_id.is_top_level()
3793                                    && right.metadata().collection_kind.is_bounded()
3794                                {
3795                                    builder.add_dfir(
3796                                        parse_quote! {
3797                                            #cross_ident = cross_singleton::<'static>();
3798                                            #left_ident -> [input]#cross_ident;
3799                                            #right_ident -> [single]#cross_ident;
3800                                        },
3801                                        None,
3802                                        Some(&stmt_id.to_string()),
3803                                    );
3804                                } else {
3805                                    builder.add_dfir(
3806                                        parse_quote! {
3807                                            #cross_ident = cross_singleton();
3808                                            #left_ident -> [input]#cross_ident;
3809                                            #right_ident -> [single]#cross_ident;
3810                                        },
3811                                        None,
3812                                        Some(&stmt_id.to_string()),
3813                                    );
3814                                }
3815                            }
3816                            BuildersOrCallback::Callback(_, node_callback) => {
3817                                node_callback(node, next_stmt_id);
3818                            }
3819                        }
3820
3821                        ident_stack.push(cross_ident);
3822                    }
3823
3824                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3825                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3826                            parse_quote!(cross_join_multiset)
3827                        } else {
3828                            parse_quote!(join_multiset)
3829                        };
3830
3831                        let (HydroNode::CrossProduct { left, right, .. }
3832                        | HydroNode::Join { left, right, .. }) = node
3833                        else {
3834                            unreachable!()
3835                        };
3836
3837                        let is_top_level = left.metadata().location_id.is_top_level()
3838                            && right.metadata().location_id.is_top_level();
3839                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3840                            quote!('static)
3841                        } else {
3842                            quote!('tick)
3843                        };
3844
3845                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3846                            quote!('static)
3847                        } else {
3848                            quote!('tick)
3849                        };
3850
3851                        let right_ident = ident_stack.pop().unwrap();
3852                        let left_ident = ident_stack.pop().unwrap();
3853
3854                        let stmt_id = next_stmt_id.get_and_increment();
3855                        let stream_ident =
3856                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3857
3858                        match builders_or_callback {
3859                            BuildersOrCallback::Builders(graph_builders) => {
3860                                let builder = graph_builders.get_dfir_mut(&out_location);
3861                                builder.add_dfir(
3862                                    if is_top_level {
3863                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3864                                        // a multiset_delta() to negate the replay behavior
3865                                        parse_quote! {
3866                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3867                                            #left_ident -> [0]#stream_ident;
3868                                            #right_ident -> [1]#stream_ident;
3869                                        }
3870                                    } else {
3871                                        parse_quote! {
3872                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3873                                            #left_ident -> [0]#stream_ident;
3874                                            #right_ident -> [1]#stream_ident;
3875                                        }
3876                                    }
3877                                    ,
3878                                    None,
3879                                    Some(&stmt_id.to_string()),
3880                                );
3881                            }
3882                            BuildersOrCallback::Callback(_, node_callback) => {
3883                                node_callback(node, next_stmt_id);
3884                            }
3885                        }
3886
3887                        ident_stack.push(stream_ident);
3888                    }
3889
3890                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3891                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3892                            parse_quote!(difference)
3893                        } else {
3894                            parse_quote!(anti_join)
3895                        };
3896
3897                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3898                            node
3899                        else {
3900                            unreachable!()
3901                        };
3902
3903                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3904                            quote!('static)
3905                        } else {
3906                            quote!('tick)
3907                        };
3908
3909                        let neg_ident = ident_stack.pop().unwrap();
3910                        let pos_ident = ident_stack.pop().unwrap();
3911
3912                        let stmt_id = next_stmt_id.get_and_increment();
3913                        let stream_ident =
3914                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3915
3916                        match builders_or_callback {
3917                            BuildersOrCallback::Builders(graph_builders) => {
3918                                let builder = graph_builders.get_dfir_mut(&out_location);
3919                                builder.add_dfir(
3920                                    parse_quote! {
3921                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3922                                        #pos_ident -> [pos]#stream_ident;
3923                                        #neg_ident -> [neg]#stream_ident;
3924                                    },
3925                                    None,
3926                                    Some(&stmt_id.to_string()),
3927                                );
3928                            }
3929                            BuildersOrCallback::Callback(_, node_callback) => {
3930                                node_callback(node, next_stmt_id);
3931                            }
3932                        }
3933
3934                        ident_stack.push(stream_ident);
3935                    }
3936
3937                    HydroNode::JoinHalf { .. } => {
3938                        let HydroNode::JoinHalf { right, .. } = node else {
3939                            unreachable!()
3940                        };
3941
3942                        assert!(
3943                            right.metadata().collection_kind.is_bounded(),
3944                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3945                            right.metadata().collection_kind
3946                        );
3947
3948                        let build_lifetime = if right.metadata().location_id.is_top_level() {
3949                            quote!('static)
3950                        } else {
3951                            quote!('tick)
3952                        };
3953
3954                        let build_ident = ident_stack.pop().unwrap();
3955                        let probe_ident = ident_stack.pop().unwrap();
3956
3957                        let stmt_id = next_stmt_id.get_and_increment();
3958                        let stream_ident =
3959                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3960
3961                        match builders_or_callback {
3962                            BuildersOrCallback::Builders(graph_builders) => {
3963                                let builder = graph_builders.get_dfir_mut(&out_location);
3964                                builder.add_dfir(
3965                                    parse_quote! {
3966                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3967                                        #probe_ident -> [probe]#stream_ident;
3968                                        #build_ident -> [build]#stream_ident;
3969                                    },
3970                                    None,
3971                                    Some(&stmt_id.to_string()),
3972                                );
3973                            }
3974                            BuildersOrCallback::Callback(_, node_callback) => {
3975                                node_callback(node, next_stmt_id);
3976                            }
3977                        }
3978
3979                        ident_stack.push(stream_ident);
3980                    }
3981
3982                    HydroNode::ResolveFutures { .. } => {
3983                        let input_ident = ident_stack.pop().unwrap();
3984
3985                        let stmt_id = next_stmt_id.get_and_increment();
3986                        let futures_ident =
3987                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3988
3989                        match builders_or_callback {
3990                            BuildersOrCallback::Builders(graph_builders) => {
3991                                let builder = graph_builders.get_dfir_mut(&out_location);
3992                                builder.add_dfir(
3993                                    parse_quote! {
3994                                        #futures_ident = #input_ident -> resolve_futures();
3995                                    },
3996                                    None,
3997                                    Some(&stmt_id.to_string()),
3998                                );
3999                            }
4000                            BuildersOrCallback::Callback(_, node_callback) => {
4001                                node_callback(node, next_stmt_id);
4002                            }
4003                        }
4004
4005                        ident_stack.push(futures_ident);
4006                    }
4007
4008                    HydroNode::ResolveFuturesBlocking { .. } => {
4009                        let input_ident = ident_stack.pop().unwrap();
4010
4011                        let stmt_id = next_stmt_id.get_and_increment();
4012                        let futures_ident =
4013                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4014
4015                        match builders_or_callback {
4016                            BuildersOrCallback::Builders(graph_builders) => {
4017                                let builder = graph_builders.get_dfir_mut(&out_location);
4018                                builder.add_dfir(
4019                                    parse_quote! {
4020                                        #futures_ident = #input_ident -> resolve_futures_blocking();
4021                                    },
4022                                    None,
4023                                    Some(&stmt_id.to_string()),
4024                                );
4025                            }
4026                            BuildersOrCallback::Callback(_, node_callback) => {
4027                                node_callback(node, next_stmt_id);
4028                            }
4029                        }
4030
4031                        ident_stack.push(futures_ident);
4032                    }
4033
4034                    HydroNode::ResolveFuturesOrdered { .. } => {
4035                        let input_ident = ident_stack.pop().unwrap();
4036
4037                        let stmt_id = next_stmt_id.get_and_increment();
4038                        let futures_ident =
4039                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4040
4041                        match builders_or_callback {
4042                            BuildersOrCallback::Builders(graph_builders) => {
4043                                let builder = graph_builders.get_dfir_mut(&out_location);
4044                                builder.add_dfir(
4045                                    parse_quote! {
4046                                        #futures_ident = #input_ident -> resolve_futures_ordered();
4047                                    },
4048                                    None,
4049                                    Some(&stmt_id.to_string()),
4050                                );
4051                            }
4052                            BuildersOrCallback::Callback(_, node_callback) => {
4053                                node_callback(node, next_stmt_id);
4054                            }
4055                        }
4056
4057                        ident_stack.push(futures_ident);
4058                    }
4059
4060                    HydroNode::Map { f, .. } => {
4061                        // Pop input ident (pushed last by transform_children).
4062                        let input_ident = ident_stack.pop().unwrap();
4063                        let f_tokens = f.emit_tokens(&mut ident_stack);
4064
4065                        let stmt_id = next_stmt_id.get_and_increment();
4066                        let map_ident =
4067                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4068
4069                        match builders_or_callback {
4070                            BuildersOrCallback::Builders(graph_builders) => {
4071                                let builder = graph_builders.get_dfir_mut(&out_location);
4072                                builder.add_dfir(
4073                                    parse_quote! {
4074                                        #map_ident = #input_ident -> map(#f_tokens);
4075                                    },
4076                                    None,
4077                                    Some(&stmt_id.to_string()),
4078                                );
4079                            }
4080                            BuildersOrCallback::Callback(_, node_callback) => {
4081                                node_callback(node, next_stmt_id);
4082                            }
4083                        }
4084
4085                        ident_stack.push(map_ident);
4086                    }
4087
4088                    HydroNode::FlatMap { f, .. } => {
4089                        let input_ident = ident_stack.pop().unwrap();
4090                        let f_tokens = f.emit_tokens(&mut ident_stack);
4091
4092                        let stmt_id = next_stmt_id.get_and_increment();
4093                        let flat_map_ident =
4094                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4095
4096                        match builders_or_callback {
4097                            BuildersOrCallback::Builders(graph_builders) => {
4098                                let builder = graph_builders.get_dfir_mut(&out_location);
4099                                builder.add_dfir(
4100                                    parse_quote! {
4101                                        #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4102                                    },
4103                                    None,
4104                                    Some(&stmt_id.to_string()),
4105                                );
4106                            }
4107                            BuildersOrCallback::Callback(_, node_callback) => {
4108                                node_callback(node, next_stmt_id);
4109                            }
4110                        }
4111
4112                        ident_stack.push(flat_map_ident);
4113                    }
4114
4115                    HydroNode::FlatMapStreamBlocking { f, .. } => {
4116                        let input_ident = ident_stack.pop().unwrap();
4117                        let f_tokens = f.emit_tokens(&mut ident_stack);
4118
4119                        let stmt_id = next_stmt_id.get_and_increment();
4120                        let flat_map_stream_blocking_ident =
4121                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4122
4123                        match builders_or_callback {
4124                            BuildersOrCallback::Builders(graph_builders) => {
4125                                let builder = graph_builders.get_dfir_mut(&out_location);
4126                                builder.add_dfir(
4127                                    parse_quote! {
4128                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4129                                    },
4130                                    None,
4131                                    Some(&stmt_id.to_string()),
4132                                );
4133                            }
4134                            BuildersOrCallback::Callback(_, node_callback) => {
4135                                node_callback(node, next_stmt_id);
4136                            }
4137                        }
4138
4139                        ident_stack.push(flat_map_stream_blocking_ident);
4140                    }
4141
4142                    HydroNode::Filter { f, .. } => {
4143                        let input_ident = ident_stack.pop().unwrap();
4144                        let f_tokens = f.emit_tokens(&mut ident_stack);
4145
4146                        let stmt_id = next_stmt_id.get_and_increment();
4147                        let filter_ident =
4148                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4149
4150                        match builders_or_callback {
4151                            BuildersOrCallback::Builders(graph_builders) => {
4152                                let builder = graph_builders.get_dfir_mut(&out_location);
4153                                builder.add_dfir(
4154                                    parse_quote! {
4155                                        #filter_ident = #input_ident -> filter(#f_tokens);
4156                                    },
4157                                    None,
4158                                    Some(&stmt_id.to_string()),
4159                                );
4160                            }
4161                            BuildersOrCallback::Callback(_, node_callback) => {
4162                                node_callback(node, next_stmt_id);
4163                            }
4164                        }
4165
4166                        ident_stack.push(filter_ident);
4167                    }
4168
4169                    HydroNode::FilterMap { f, .. } => {
4170                        let input_ident = ident_stack.pop().unwrap();
4171                        let f_tokens = f.emit_tokens(&mut ident_stack);
4172
4173                        let stmt_id = next_stmt_id.get_and_increment();
4174                        let filter_map_ident =
4175                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4176
4177                        match builders_or_callback {
4178                            BuildersOrCallback::Builders(graph_builders) => {
4179                                let builder = graph_builders.get_dfir_mut(&out_location);
4180                                builder.add_dfir(
4181                                    parse_quote! {
4182                                        #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4183                                    },
4184                                    None,
4185                                    Some(&stmt_id.to_string()),
4186                                );
4187                            }
4188                            BuildersOrCallback::Callback(_, node_callback) => {
4189                                node_callback(node, next_stmt_id);
4190                            }
4191                        }
4192
4193                        ident_stack.push(filter_map_ident);
4194                    }
4195
4196                    HydroNode::Sort { .. } => {
4197                        let input_ident = ident_stack.pop().unwrap();
4198
4199                        let stmt_id = next_stmt_id.get_and_increment();
4200                        let sort_ident =
4201                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4202
4203                        match builders_or_callback {
4204                            BuildersOrCallback::Builders(graph_builders) => {
4205                                let builder = graph_builders.get_dfir_mut(&out_location);
4206                                builder.add_dfir(
4207                                    parse_quote! {
4208                                        #sort_ident = #input_ident -> sort();
4209                                    },
4210                                    None,
4211                                    Some(&stmt_id.to_string()),
4212                                );
4213                            }
4214                            BuildersOrCallback::Callback(_, node_callback) => {
4215                                node_callback(node, next_stmt_id);
4216                            }
4217                        }
4218
4219                        ident_stack.push(sort_ident);
4220                    }
4221
4222                    HydroNode::DeferTick { .. } => {
4223                        let input_ident = ident_stack.pop().unwrap();
4224
4225                        let stmt_id = next_stmt_id.get_and_increment();
4226                        let defer_tick_ident =
4227                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4228
4229                        match builders_or_callback {
4230                            BuildersOrCallback::Builders(graph_builders) => {
4231                                let builder = graph_builders.get_dfir_mut(&out_location);
4232                                builder.add_dfir(
4233                                    parse_quote! {
4234                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
4235                                    },
4236                                    None,
4237                                    Some(&stmt_id.to_string()),
4238                                );
4239                            }
4240                            BuildersOrCallback::Callback(_, node_callback) => {
4241                                node_callback(node, next_stmt_id);
4242                            }
4243                        }
4244
4245                        ident_stack.push(defer_tick_ident);
4246                    }
4247
4248                    HydroNode::Enumerate { input, .. } => {
4249                        let input_ident = ident_stack.pop().unwrap();
4250
4251                        let stmt_id = next_stmt_id.get_and_increment();
4252                        let enumerate_ident =
4253                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4254
4255                        match builders_or_callback {
4256                            BuildersOrCallback::Builders(graph_builders) => {
4257                                let builder = graph_builders.get_dfir_mut(&out_location);
4258                                let lifetime = if input.metadata().location_id.is_top_level() {
4259                                    quote!('static)
4260                                } else {
4261                                    quote!('tick)
4262                                };
4263                                builder.add_dfir(
4264                                    parse_quote! {
4265                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4266                                    },
4267                                    None,
4268                                    Some(&stmt_id.to_string()),
4269                                );
4270                            }
4271                            BuildersOrCallback::Callback(_, node_callback) => {
4272                                node_callback(node, next_stmt_id);
4273                            }
4274                        }
4275
4276                        ident_stack.push(enumerate_ident);
4277                    }
4278
4279                    HydroNode::Inspect { f, .. } => {
4280                        let input_ident = ident_stack.pop().unwrap();
4281                        let f_tokens = f.emit_tokens(&mut ident_stack);
4282
4283                        let stmt_id = next_stmt_id.get_and_increment();
4284                        let inspect_ident =
4285                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4286
4287                        match builders_or_callback {
4288                            BuildersOrCallback::Builders(graph_builders) => {
4289                                let builder = graph_builders.get_dfir_mut(&out_location);
4290                                builder.add_dfir(
4291                                    parse_quote! {
4292                                        #inspect_ident = #input_ident -> inspect(#f_tokens);
4293                                    },
4294                                    None,
4295                                    Some(&stmt_id.to_string()),
4296                                );
4297                            }
4298                            BuildersOrCallback::Callback(_, node_callback) => {
4299                                node_callback(node, next_stmt_id);
4300                            }
4301                        }
4302
4303                        ident_stack.push(inspect_ident);
4304                    }
4305
4306                    HydroNode::Unique { input, .. } => {
4307                        let input_ident = ident_stack.pop().unwrap();
4308
4309                        let stmt_id = next_stmt_id.get_and_increment();
4310                        let unique_ident =
4311                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4312
4313                        match builders_or_callback {
4314                            BuildersOrCallback::Builders(graph_builders) => {
4315                                let builder = graph_builders.get_dfir_mut(&out_location);
4316                                let lifetime = if input.metadata().location_id.is_top_level() {
4317                                    quote!('static)
4318                                } else {
4319                                    quote!('tick)
4320                                };
4321
4322                                builder.add_dfir(
4323                                    parse_quote! {
4324                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4325                                    },
4326                                    None,
4327                                    Some(&stmt_id.to_string()),
4328                                );
4329                            }
4330                            BuildersOrCallback::Callback(_, node_callback) => {
4331                                node_callback(node, next_stmt_id);
4332                            }
4333                        }
4334
4335                        ident_stack.push(unique_ident);
4336                    }
4337
4338                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4339                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4340                            if input.metadata().location_id.is_top_level()
4341                                && input.metadata().collection_kind.is_bounded()
4342                            {
4343                                parse_quote!(fold_no_replay)
4344                            } else {
4345                                parse_quote!(fold)
4346                            }
4347                        } else if matches!(node, HydroNode::Scan { .. }) {
4348                            parse_quote!(scan)
4349                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4350                            parse_quote!(scan_async_blocking)
4351                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4352                            if input.metadata().location_id.is_top_level()
4353                                && input.metadata().collection_kind.is_bounded()
4354                            {
4355                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4356                            } else {
4357                                parse_quote!(fold_keyed)
4358                            }
4359                        } else {
4360                            unreachable!()
4361                        };
4362
4363                        let (HydroNode::Fold { input, .. }
4364                        | HydroNode::FoldKeyed { input, .. }
4365                        | HydroNode::Scan { input, .. }
4366                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4367                        else {
4368                            unreachable!()
4369                        };
4370
4371                        let lifetime = if input.metadata().location_id.is_top_level() {
4372                            quote!('static)
4373                        } else {
4374                            quote!('tick)
4375                        };
4376
4377                        let input_ident = ident_stack.pop().unwrap();
4378
4379                        let (HydroNode::Fold { init, acc, .. }
4380                        | HydroNode::FoldKeyed { init, acc, .. }
4381                        | HydroNode::Scan { init, acc, .. }
4382                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4383                        else {
4384                            unreachable!()
4385                        };
4386
4387                        let acc_tokens = acc.emit_tokens(&mut ident_stack);
4388                        let init_tokens = init.emit_tokens(&mut ident_stack);
4389
4390                        let stmt_id = next_stmt_id.get_and_increment();
4391                        let fold_ident =
4392                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4393
4394                        match builders_or_callback {
4395                            BuildersOrCallback::Builders(graph_builders) => {
4396                                if matches!(node, HydroNode::Fold { .. })
4397                                    && node.metadata().location_id.is_top_level()
4398                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4399                                    && graph_builders.singleton_intermediates()
4400                                    && !node.metadata().collection_kind.is_bounded()
4401                                {
4402                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4403                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4404                                        &input.metadata().location_id,
4405                                        &input_ident,
4406                                        &input.metadata().collection_kind,
4407                                        &node.metadata().op,
4408                                    );
4409
4410                                    let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4411                                        let acc: syn::Expr = parse_quote!({
4412                                            let mut __inner = #acc_tokens;
4413                                            move |__state, __batch: Vec<_>| {
4414                                                if __batch.is_empty() {
4415                                                    return None;
4416                                                }
4417                                                for __value in __batch {
4418                                                    __inner(__state, __value);
4419                                                }
4420                                                Some(__state.clone())
4421                                            }
4422                                        });
4423                                        (hooked, acc)
4424                                    } else {
4425                                        let acc: syn::Expr = parse_quote!({
4426                                            let mut __inner = #acc_tokens;
4427                                            move |__state, __value| {
4428                                                __inner(__state, __value);
4429                                                Some(__state.clone())
4430                                            }
4431                                        });
4432                                        (&input_ident, acc)
4433                                    };
4434
4435                                    let builder = graph_builders.get_dfir_mut(&out_location);
4436                                    builder.add_dfir(
4437                                        parse_quote! {
4438                                            source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4439                                            #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4440                                            #fold_ident = chain();
4441                                        },
4442                                        None,
4443                                        Some(&stmt_id.to_string()),
4444                                    );
4445
4446                                    if hooked_input_ident.is_some() {
4447                                        fold_hooked_idents.insert(fold_ident.to_string());
4448                                    }
4449                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4450                                    && node.metadata().location_id.is_top_level()
4451                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4452                                    && graph_builders.singleton_intermediates()
4453                                    && !node.metadata().collection_kind.is_bounded()
4454                                {
4455                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4456                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4457                                        &input.metadata().location_id,
4458                                        &input_ident,
4459                                        &input.metadata().collection_kind,
4460                                        &node.metadata().op,
4461                                    );
4462                                    let builder = graph_builders.get_dfir_mut(&out_location);
4463
4464                                    let wrapped_acc: syn::Expr = parse_quote!({
4465                                        let mut __init = #init_tokens;
4466                                        let mut __inner = #acc_tokens;
4467                                        move |__state, __kv: (_, _)| {
4468                                            // TODO(shadaj): we can avoid the clone when the entry exists
4469                                            let __state = __state
4470                                                .entry(::std::clone::Clone::clone(&__kv.0))
4471                                                .or_insert_with(|| (__init)());
4472                                            __inner(__state, __kv.1);
4473                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4474                                        }
4475                                    });
4476
4477                                    if let Some(hooked_input_ident) = hooked_input_ident {
4478                                        builder.add_dfir(
4479                                            parse_quote! {
4480                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4481                                            },
4482                                            None,
4483                                            Some(&stmt_id.to_string()),
4484                                        );
4485
4486                                        fold_hooked_idents.insert(fold_ident.to_string());
4487                                    } else {
4488                                        builder.add_dfir(
4489                                            parse_quote! {
4490                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4491                                            },
4492                                            None,
4493                                            Some(&stmt_id.to_string()),
4494                                        );
4495                                    }
4496                                } else if (matches!(node, HydroNode::Fold { .. })
4497                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4498                                    && !node.metadata().location_id.is_top_level()
4499                                    && graph_builders.singleton_intermediates()
4500                                {
4501                                    let input_ref = match &*node {
4502                                        HydroNode::Fold { input, .. } => input,
4503                                        HydroNode::FoldKeyed { input, .. } => input,
4504                                        _ => unreachable!(),
4505                                    };
4506                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4507                                        &input_ref.metadata().location_id,
4508                                        &input_ident,
4509                                        &input_ref.metadata().collection_kind,
4510                                        &node.metadata().op,
4511                                    );
4512
4513                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4514                                    let builder = graph_builders.get_dfir_mut(&out_location);
4515                                    builder.add_dfir(
4516                                        parse_quote! {
4517                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4518                                        },
4519                                        None,
4520                                        Some(&stmt_id.to_string()),
4521                                    );
4522                                } else {
4523                                    let builder = graph_builders.get_dfir_mut(&out_location);
4524                                    builder.add_dfir(
4525                                        parse_quote! {
4526                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4527                                        },
4528                                        None,
4529                                        Some(&stmt_id.to_string()),
4530                                    );
4531                                }
4532                            }
4533                            BuildersOrCallback::Callback(_, node_callback) => {
4534                                node_callback(node, next_stmt_id);
4535                            }
4536                        }
4537
4538                        ident_stack.push(fold_ident);
4539                    }
4540
4541                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4542                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4543                            if input.metadata().location_id.is_top_level()
4544                                && input.metadata().collection_kind.is_bounded()
4545                            {
4546                                parse_quote!(reduce_no_replay)
4547                            } else {
4548                                parse_quote!(reduce)
4549                            }
4550                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4551                            if input.metadata().location_id.is_top_level()
4552                                && input.metadata().collection_kind.is_bounded()
4553                            {
4554                                todo!(
4555                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4556                                )
4557                            } else {
4558                                parse_quote!(reduce_keyed)
4559                            }
4560                        } else {
4561                            unreachable!()
4562                        };
4563
4564                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4565                        else {
4566                            unreachable!()
4567                        };
4568
4569                        let lifetime = if input.metadata().location_id.is_top_level() {
4570                            quote!('static)
4571                        } else {
4572                            quote!('tick)
4573                        };
4574
4575                        let input_ident = ident_stack.pop().unwrap();
4576
4577                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4578                        else {
4579                            unreachable!()
4580                        };
4581
4582                        let f_tokens = f.emit_tokens(&mut ident_stack);
4583
4584                        let stmt_id = next_stmt_id.get_and_increment();
4585                        let reduce_ident =
4586                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4587
4588                        match builders_or_callback {
4589                            BuildersOrCallback::Builders(graph_builders) => {
4590                                if matches!(node, HydroNode::Reduce { .. })
4591                                    && node.metadata().location_id.is_top_level()
4592                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4593                                    && graph_builders.singleton_intermediates()
4594                                    && !node.metadata().collection_kind.is_bounded()
4595                                {
4596                                    todo!(
4597                                        "Reduce with optional intermediates is not yet supported in simulator"
4598                                    );
4599                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4600                                    && node.metadata().location_id.is_top_level()
4601                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4602                                    && graph_builders.singleton_intermediates()
4603                                    && !node.metadata().collection_kind.is_bounded()
4604                                {
4605                                    todo!(
4606                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4607                                    );
4608                                } else {
4609                                    let builder = graph_builders.get_dfir_mut(&out_location);
4610                                    builder.add_dfir(
4611                                        parse_quote! {
4612                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4613                                        },
4614                                        None,
4615                                        Some(&stmt_id.to_string()),
4616                                    );
4617                                }
4618                            }
4619                            BuildersOrCallback::Callback(_, node_callback) => {
4620                                node_callback(node, next_stmt_id);
4621                            }
4622                        }
4623
4624                        ident_stack.push(reduce_ident);
4625                    }
4626
4627                    HydroNode::ReduceKeyedWatermark {
4628                        f,
4629                        input,
4630                        metadata,
4631                        ..
4632                    } => {
4633                        let lifetime = if input.metadata().location_id.is_top_level() {
4634                            quote!('static)
4635                        } else {
4636                            quote!('tick)
4637                        };
4638
4639                        // watermark is processed second, so it's on top
4640                        let watermark_ident = ident_stack.pop().unwrap();
4641                        let input_ident = ident_stack.pop().unwrap();
4642                        let f_tokens = f.emit_tokens(&mut ident_stack);
4643
4644                        let stmt_id = next_stmt_id.get_and_increment();
4645                        let chain_ident = syn::Ident::new(
4646                            &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4647                            Span::call_site(),
4648                        );
4649
4650                        let fold_ident =
4651                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4652
4653                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4654                            && input.metadata().collection_kind.is_bounded()
4655                        {
4656                            parse_quote!(fold_no_replay)
4657                        } else {
4658                            parse_quote!(fold)
4659                        };
4660
4661                        match builders_or_callback {
4662                            BuildersOrCallback::Builders(graph_builders) => {
4663                                if metadata.location_id.is_top_level()
4664                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4665                                    && graph_builders.singleton_intermediates()
4666                                    && !metadata.collection_kind.is_bounded()
4667                                {
4668                                    todo!(
4669                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4670                                    )
4671                                } else {
4672                                    let builder = graph_builders.get_dfir_mut(&out_location);
4673                                    builder.add_dfir(
4674                                        parse_quote! {
4675                                            #chain_ident = chain();
4676                                            #input_ident
4677                                                -> map(|x| (Some(x), None))
4678                                                -> [0]#chain_ident;
4679                                            #watermark_ident
4680                                                -> map(|watermark| (None, Some(watermark)))
4681                                                -> [1]#chain_ident;
4682
4683                                            #fold_ident = #chain_ident
4684                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4685                                                    let __reduce_keyed_fn = #f_tokens;
4686                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4687                                                        if let Some((k, v)) = opt_payload {
4688                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4689                                                                if k < curr_watermark {
4690                                                                    return;
4691                                                                }
4692                                                            }
4693                                                            match map.entry(k) {
4694                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4695                                                                    e.insert(v);
4696                                                                }
4697                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4698                                                                    __reduce_keyed_fn(e.get_mut(), v);
4699                                                                }
4700                                                            }
4701                                                        } else {
4702                                                            let watermark = opt_watermark.unwrap();
4703                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4704                                                                if watermark <= curr_watermark {
4705                                                                    return;
4706                                                                }
4707                                                            }
4708                                                            map.retain(|k, _| *k >= watermark);
4709                                                            *opt_curr_watermark = Some(watermark);
4710                                                        }
4711                                                    }
4712                                                })
4713                                                -> flat_map(|(map, _curr_watermark)| map);
4714                                        },
4715                                        None,
4716                                        Some(&stmt_id.to_string()),
4717                                    );
4718                                }
4719                            }
4720                            BuildersOrCallback::Callback(_, node_callback) => {
4721                                node_callback(node, next_stmt_id);
4722                            }
4723                        }
4724
4725                        ident_stack.push(fold_ident);
4726                    }
4727
4728                    HydroNode::Network {
4729                        networking_info,
4730                        serialize_fn: serialize_pipeline,
4731                        instantiate_fn,
4732                        deserialize_fn: deserialize_pipeline,
4733                        input,
4734                        ..
4735                    } => {
4736                        let input_ident = ident_stack.pop().unwrap();
4737
4738                        let stmt_id = next_stmt_id.get_and_increment();
4739                        let receiver_stream_ident =
4740                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4741
4742                        match builders_or_callback {
4743                            BuildersOrCallback::Builders(graph_builders) => {
4744                                let (sink_expr, source_expr) = match instantiate_fn {
4745                                    DebugInstantiate::Building => (
4746                                        syn::parse_quote!(DUMMY_SINK),
4747                                        syn::parse_quote!(DUMMY_SOURCE),
4748                                    ),
4749
4750                                    DebugInstantiate::Finalized(finalized) => {
4751                                        (finalized.sink.clone(), finalized.source.clone())
4752                                    }
4753                                };
4754
4755                                graph_builders.create_network(
4756                                    &input.metadata().location_id,
4757                                    &out_location,
4758                                    input_ident,
4759                                    &receiver_stream_ident,
4760                                    serialize_pipeline.as_ref(),
4761                                    sink_expr,
4762                                    source_expr,
4763                                    deserialize_pipeline.as_ref(),
4764                                    stmt_id,
4765                                    networking_info,
4766                                );
4767                            }
4768                            BuildersOrCallback::Callback(_, node_callback) => {
4769                                node_callback(node, next_stmt_id);
4770                            }
4771                        }
4772
4773                        ident_stack.push(receiver_stream_ident);
4774                    }
4775
4776                    HydroNode::ExternalInput {
4777                        instantiate_fn,
4778                        deserialize_fn: deserialize_pipeline,
4779                        ..
4780                    } => {
4781                        let stmt_id = next_stmt_id.get_and_increment();
4782                        let receiver_stream_ident =
4783                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4784
4785                        match builders_or_callback {
4786                            BuildersOrCallback::Builders(graph_builders) => {
4787                                let (_, source_expr) = match instantiate_fn {
4788                                    DebugInstantiate::Building => (
4789                                        syn::parse_quote!(DUMMY_SINK),
4790                                        syn::parse_quote!(DUMMY_SOURCE),
4791                                    ),
4792
4793                                    DebugInstantiate::Finalized(finalized) => {
4794                                        (finalized.sink.clone(), finalized.source.clone())
4795                                    }
4796                                };
4797
4798                                graph_builders.create_external_source(
4799                                    &out_location,
4800                                    source_expr,
4801                                    &receiver_stream_ident,
4802                                    deserialize_pipeline.as_ref(),
4803                                    stmt_id,
4804                                );
4805                            }
4806                            BuildersOrCallback::Callback(_, node_callback) => {
4807                                node_callback(node, next_stmt_id);
4808                            }
4809                        }
4810
4811                        ident_stack.push(receiver_stream_ident);
4812                    }
4813
4814                    HydroNode::Counter {
4815                        tag,
4816                        duration,
4817                        prefix,
4818                        ..
4819                    } => {
4820                        let input_ident = ident_stack.pop().unwrap();
4821
4822                        let stmt_id = next_stmt_id.get_and_increment();
4823                        let counter_ident =
4824                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4825
4826                        match builders_or_callback {
4827                            BuildersOrCallback::Builders(graph_builders) => {
4828                                let arg = format!("{}({})", prefix, tag);
4829                                let builder = graph_builders.get_dfir_mut(&out_location);
4830                                builder.add_dfir(
4831                                    parse_quote! {
4832                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
4833                                    },
4834                                    None,
4835                                    Some(&stmt_id.to_string()),
4836                                );
4837                            }
4838                            BuildersOrCallback::Callback(_, node_callback) => {
4839                                node_callback(node, next_stmt_id);
4840                            }
4841                        }
4842
4843                        ident_stack.push(counter_ident);
4844                    }
4845                }
4846            },
4847            seen_tees,
4848            false,
4849        );
4850
4851        let ret = ident_stack
4852            .pop()
4853            .expect("ident_stack should have exactly one element after traversal");
4854        assert!(
4855            ident_stack.is_empty(),
4856            "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
4857             This indicates a bug in the code gen: some node pushed idents that were never consumed.",
4858            ident_stack.len()
4859        );
4860        ret
4861    }
4862
4863    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4864        match self {
4865            HydroNode::Placeholder => {
4866                panic!()
4867            }
4868            HydroNode::Cast { .. }
4869            | HydroNode::ObserveNonDet { .. }
4870            | HydroNode::UnboundSingleton { .. }
4871            | HydroNode::AssertIsConsistent { .. } => {}
4872            HydroNode::Source { source, .. } => match source {
4873                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4874                HydroSource::ExternalNetwork()
4875                | HydroSource::Spin()
4876                | HydroSource::ClusterMembers(_, _)
4877                | HydroSource::Embedded(_)
4878                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
4879            },
4880            HydroNode::SingletonSource { value, .. } => {
4881                transform(value);
4882            }
4883            HydroNode::CycleSource { .. }
4884            | HydroNode::Tee { .. }
4885            | HydroNode::Reference { .. }
4886            | HydroNode::YieldConcat { .. }
4887            | HydroNode::BeginAtomic { .. }
4888            | HydroNode::EndAtomic { .. }
4889            | HydroNode::Batch { .. }
4890            | HydroNode::Chain { .. }
4891            | HydroNode::MergeOrdered { .. }
4892            | HydroNode::ChainFirst { .. }
4893            | HydroNode::CrossProduct { .. }
4894            | HydroNode::CrossSingleton { .. }
4895            | HydroNode::ResolveFutures { .. }
4896            | HydroNode::ResolveFuturesBlocking { .. }
4897            | HydroNode::ResolveFuturesOrdered { .. }
4898            | HydroNode::Join { .. }
4899            | HydroNode::JoinHalf { .. }
4900            | HydroNode::Difference { .. }
4901            | HydroNode::AntiJoin { .. }
4902            | HydroNode::DeferTick { .. }
4903            | HydroNode::Enumerate { .. }
4904            | HydroNode::Unique { .. }
4905            | HydroNode::Sort { .. } => {}
4906            HydroNode::Map { f, .. }
4907            | HydroNode::FlatMap { f, .. }
4908            | HydroNode::FlatMapStreamBlocking { f, .. }
4909            | HydroNode::Filter { f, .. }
4910            | HydroNode::FilterMap { f, .. }
4911            | HydroNode::Inspect { f, .. }
4912            | HydroNode::Partition { f, .. }
4913            | HydroNode::Reduce { f, .. }
4914            | HydroNode::ReduceKeyed { f, .. }
4915            | HydroNode::ReduceKeyedWatermark { f, .. } => {
4916                transform(&mut f.expr);
4917            }
4918            HydroNode::Fold { init, acc, .. }
4919            | HydroNode::Scan { init, acc, .. }
4920            | HydroNode::ScanAsyncBlocking { init, acc, .. }
4921            | HydroNode::FoldKeyed { init, acc, .. } => {
4922                transform(&mut init.expr);
4923                transform(&mut acc.expr);
4924            }
4925            HydroNode::Network {
4926                serialize_fn,
4927                deserialize_fn,
4928                ..
4929            } => {
4930                if let Some(serialize_fn) = serialize_fn {
4931                    transform(serialize_fn);
4932                }
4933                if let Some(deserialize_fn) = deserialize_fn {
4934                    transform(deserialize_fn);
4935                }
4936            }
4937            HydroNode::ExternalInput { deserialize_fn, .. } => {
4938                if let Some(deserialize_fn) = deserialize_fn {
4939                    transform(deserialize_fn);
4940                }
4941            }
4942            HydroNode::Counter { duration, .. } => {
4943                transform(duration);
4944            }
4945        }
4946    }
4947
4948    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4949        &self.metadata().op
4950    }
4951
4952    pub fn metadata(&self) -> &HydroIrMetadata {
4953        match self {
4954            HydroNode::Placeholder => {
4955                panic!()
4956            }
4957            HydroNode::Cast { metadata, .. }
4958            | HydroNode::ObserveNonDet { metadata, .. }
4959            | HydroNode::AssertIsConsistent { metadata, .. }
4960            | HydroNode::UnboundSingleton { metadata, .. }
4961            | HydroNode::Source { metadata, .. }
4962            | HydroNode::SingletonSource { metadata, .. }
4963            | HydroNode::CycleSource { metadata, .. }
4964            | HydroNode::Tee { metadata, .. }
4965            | HydroNode::Reference { metadata, .. }
4966            | HydroNode::Partition { metadata, .. }
4967            | HydroNode::YieldConcat { metadata, .. }
4968            | HydroNode::BeginAtomic { metadata, .. }
4969            | HydroNode::EndAtomic { metadata, .. }
4970            | HydroNode::Batch { metadata, .. }
4971            | HydroNode::Chain { metadata, .. }
4972            | HydroNode::MergeOrdered { metadata, .. }
4973            | HydroNode::ChainFirst { metadata, .. }
4974            | HydroNode::CrossProduct { metadata, .. }
4975            | HydroNode::CrossSingleton { metadata, .. }
4976            | HydroNode::Join { metadata, .. }
4977            | HydroNode::JoinHalf { metadata, .. }
4978            | HydroNode::Difference { metadata, .. }
4979            | HydroNode::AntiJoin { metadata, .. }
4980            | HydroNode::ResolveFutures { metadata, .. }
4981            | HydroNode::ResolveFuturesBlocking { metadata, .. }
4982            | HydroNode::ResolveFuturesOrdered { metadata, .. }
4983            | HydroNode::Map { metadata, .. }
4984            | HydroNode::FlatMap { metadata, .. }
4985            | HydroNode::FlatMapStreamBlocking { metadata, .. }
4986            | HydroNode::Filter { metadata, .. }
4987            | HydroNode::FilterMap { metadata, .. }
4988            | HydroNode::DeferTick { metadata, .. }
4989            | HydroNode::Enumerate { metadata, .. }
4990            | HydroNode::Inspect { metadata, .. }
4991            | HydroNode::Unique { metadata, .. }
4992            | HydroNode::Sort { metadata, .. }
4993            | HydroNode::Scan { metadata, .. }
4994            | HydroNode::ScanAsyncBlocking { metadata, .. }
4995            | HydroNode::Fold { metadata, .. }
4996            | HydroNode::FoldKeyed { metadata, .. }
4997            | HydroNode::Reduce { metadata, .. }
4998            | HydroNode::ReduceKeyed { metadata, .. }
4999            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5000            | HydroNode::ExternalInput { metadata, .. }
5001            | HydroNode::Network { metadata, .. }
5002            | HydroNode::Counter { metadata, .. } => metadata,
5003        }
5004    }
5005
5006    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5007        &mut self.metadata_mut().op
5008    }
5009
5010    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5011        match self {
5012            HydroNode::Placeholder => {
5013                panic!()
5014            }
5015            HydroNode::Cast { metadata, .. }
5016            | HydroNode::ObserveNonDet { metadata, .. }
5017            | HydroNode::AssertIsConsistent { metadata, .. }
5018            | HydroNode::UnboundSingleton { metadata, .. }
5019            | HydroNode::Source { metadata, .. }
5020            | HydroNode::SingletonSource { metadata, .. }
5021            | HydroNode::CycleSource { metadata, .. }
5022            | HydroNode::Tee { metadata, .. }
5023            | HydroNode::Reference { metadata, .. }
5024            | HydroNode::Partition { metadata, .. }
5025            | HydroNode::YieldConcat { metadata, .. }
5026            | HydroNode::BeginAtomic { metadata, .. }
5027            | HydroNode::EndAtomic { metadata, .. }
5028            | HydroNode::Batch { metadata, .. }
5029            | HydroNode::Chain { metadata, .. }
5030            | HydroNode::MergeOrdered { metadata, .. }
5031            | HydroNode::ChainFirst { metadata, .. }
5032            | HydroNode::CrossProduct { metadata, .. }
5033            | HydroNode::CrossSingleton { metadata, .. }
5034            | HydroNode::Join { metadata, .. }
5035            | HydroNode::JoinHalf { metadata, .. }
5036            | HydroNode::Difference { metadata, .. }
5037            | HydroNode::AntiJoin { metadata, .. }
5038            | HydroNode::ResolveFutures { metadata, .. }
5039            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5040            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5041            | HydroNode::Map { metadata, .. }
5042            | HydroNode::FlatMap { metadata, .. }
5043            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5044            | HydroNode::Filter { metadata, .. }
5045            | HydroNode::FilterMap { metadata, .. }
5046            | HydroNode::DeferTick { metadata, .. }
5047            | HydroNode::Enumerate { metadata, .. }
5048            | HydroNode::Inspect { metadata, .. }
5049            | HydroNode::Unique { metadata, .. }
5050            | HydroNode::Sort { metadata, .. }
5051            | HydroNode::Scan { metadata, .. }
5052            | HydroNode::ScanAsyncBlocking { metadata, .. }
5053            | HydroNode::Fold { metadata, .. }
5054            | HydroNode::FoldKeyed { metadata, .. }
5055            | HydroNode::Reduce { metadata, .. }
5056            | HydroNode::ReduceKeyed { metadata, .. }
5057            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5058            | HydroNode::ExternalInput { metadata, .. }
5059            | HydroNode::Network { metadata, .. }
5060            | HydroNode::Counter { metadata, .. } => metadata,
5061        }
5062    }
5063
5064    pub fn input(&self) -> Vec<&HydroNode> {
5065        match self {
5066            HydroNode::Placeholder => {
5067                panic!()
5068            }
5069            HydroNode::Source { .. }
5070            | HydroNode::SingletonSource { .. }
5071            | HydroNode::ExternalInput { .. }
5072            | HydroNode::CycleSource { .. }
5073            | HydroNode::Tee { .. }
5074            | HydroNode::Reference { .. }
5075            | HydroNode::Partition { .. } => {
5076                // Tee/Partition should find their input in separate special ways
5077                vec![]
5078            }
5079            HydroNode::Cast { inner, .. }
5080            | HydroNode::ObserveNonDet { inner, .. }
5081            | HydroNode::YieldConcat { inner, .. }
5082            | HydroNode::BeginAtomic { inner, .. }
5083            | HydroNode::EndAtomic { inner, .. }
5084            | HydroNode::Batch { inner, .. }
5085            | HydroNode::UnboundSingleton { inner, .. }
5086            | HydroNode::AssertIsConsistent { inner, .. } => {
5087                vec![inner]
5088            }
5089            HydroNode::Chain { first, second, .. } => {
5090                vec![first, second]
5091            }
5092            HydroNode::MergeOrdered { first, second, .. } => {
5093                vec![first, second]
5094            }
5095            HydroNode::ChainFirst { first, second, .. } => {
5096                vec![first, second]
5097            }
5098            HydroNode::CrossProduct { left, right, .. }
5099            | HydroNode::CrossSingleton { left, right, .. }
5100            | HydroNode::Join { left, right, .. }
5101            | HydroNode::JoinHalf { left, right, .. } => {
5102                vec![left, right]
5103            }
5104            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5105                vec![pos, neg]
5106            }
5107            HydroNode::Map { input, .. }
5108            | HydroNode::FlatMap { input, .. }
5109            | HydroNode::FlatMapStreamBlocking { input, .. }
5110            | HydroNode::Filter { input, .. }
5111            | HydroNode::FilterMap { input, .. }
5112            | HydroNode::Sort { input, .. }
5113            | HydroNode::DeferTick { input, .. }
5114            | HydroNode::Enumerate { input, .. }
5115            | HydroNode::Inspect { input, .. }
5116            | HydroNode::Unique { input, .. }
5117            | HydroNode::Network { input, .. }
5118            | HydroNode::Counter { input, .. }
5119            | HydroNode::ResolveFutures { input, .. }
5120            | HydroNode::ResolveFuturesBlocking { input, .. }
5121            | HydroNode::ResolveFuturesOrdered { input, .. }
5122            | HydroNode::Fold { input, .. }
5123            | HydroNode::FoldKeyed { input, .. }
5124            | HydroNode::Reduce { input, .. }
5125            | HydroNode::ReduceKeyed { input, .. }
5126            | HydroNode::Scan { input, .. }
5127            | HydroNode::ScanAsyncBlocking { input, .. } => {
5128                vec![input]
5129            }
5130            HydroNode::ReduceKeyedWatermark {
5131                input, watermark, ..
5132            } => {
5133                vec![input, watermark]
5134            }
5135        }
5136    }
5137
5138    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5139        self.input()
5140            .iter()
5141            .map(|input_node| input_node.metadata())
5142            .collect()
5143    }
5144
5145    /// Returns `true` if this node is a Tee or Partition whose inner Rc
5146    /// has other live references, meaning the upstream is already driven
5147    /// by another consumer and does not need a Null sink.
5148    pub fn is_shared_with_others(&self) -> bool {
5149        match self {
5150            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5151                Rc::strong_count(&inner.0) > 1
5152            }
5153            // A zero-output reference node is valid in DFIR (it drains itself at
5154            // end of tick), so it doesn't need to be driven by another consumer.
5155            HydroNode::Reference { .. } => false,
5156            _ => false,
5157        }
5158    }
5159
5160    pub fn print_root(&self) -> String {
5161        match self {
5162            HydroNode::Placeholder => {
5163                panic!()
5164            }
5165            HydroNode::Cast { .. } => "Cast()".to_owned(),
5166            HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5167            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5168            HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5169            HydroNode::Source { source, .. } => format!("Source({:?})", source),
5170            HydroNode::SingletonSource {
5171                value,
5172                first_tick_only,
5173                ..
5174            } => format!(
5175                "SingletonSource({:?}, first_tick_only={})",
5176                value, first_tick_only
5177            ),
5178            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5179            HydroNode::Tee { inner, .. } => {
5180                format!("Tee({})", inner.0.borrow().print_root())
5181            }
5182            HydroNode::Reference { inner, kind, .. } => {
5183                format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5184            }
5185            HydroNode::Partition { f, is_true, .. } => {
5186                format!("Partition({:?}, is_true={})", f, is_true)
5187            }
5188            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5189            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5190            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5191            HydroNode::Batch { .. } => "Batch()".to_owned(),
5192            HydroNode::Chain { first, second, .. } => {
5193                format!("Chain({}, {})", first.print_root(), second.print_root())
5194            }
5195            HydroNode::MergeOrdered { first, second, .. } => {
5196                format!(
5197                    "MergeOrdered({}, {})",
5198                    first.print_root(),
5199                    second.print_root()
5200                )
5201            }
5202            HydroNode::ChainFirst { first, second, .. } => {
5203                format!(
5204                    "ChainFirst({}, {})",
5205                    first.print_root(),
5206                    second.print_root()
5207                )
5208            }
5209            HydroNode::CrossProduct { left, right, .. } => {
5210                format!(
5211                    "CrossProduct({}, {})",
5212                    left.print_root(),
5213                    right.print_root()
5214                )
5215            }
5216            HydroNode::CrossSingleton { left, right, .. } => {
5217                format!(
5218                    "CrossSingleton({}, {})",
5219                    left.print_root(),
5220                    right.print_root()
5221                )
5222            }
5223            HydroNode::Join { left, right, .. } => {
5224                format!("Join({}, {})", left.print_root(), right.print_root())
5225            }
5226            HydroNode::JoinHalf { left, right, .. } => {
5227                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5228            }
5229            HydroNode::Difference { pos, neg, .. } => {
5230                format!("Difference({}, {})", pos.print_root(), neg.print_root())
5231            }
5232            HydroNode::AntiJoin { pos, neg, .. } => {
5233                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5234            }
5235            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5236            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5237            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5238            HydroNode::Map { f, .. } => format!("Map({:?})", f),
5239            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5240            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5241            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5242            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5243            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5244            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5245            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5246            HydroNode::Unique { .. } => "Unique()".to_owned(),
5247            HydroNode::Sort { .. } => "Sort()".to_owned(),
5248            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5249            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5250            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5251                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5252            }
5253            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5254            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5255            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5256            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5257            HydroNode::Network { .. } => "Network()".to_owned(),
5258            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5259            HydroNode::Counter { tag, duration, .. } => {
5260                format!("Counter({:?}, {:?})", tag, duration)
5261            }
5262        }
5263    }
5264}
5265
5266#[cfg(feature = "build")]
5267fn instantiate_network<'a, D>(
5268    env: &mut D::InstantiateEnv,
5269    from_location: &LocationId,
5270    to_location: &LocationId,
5271    processes: &SparseSecondaryMap<LocationKey, D::Process>,
5272    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5273    name: Option<&str>,
5274    networking_info: &crate::networking::NetworkingInfo,
5275) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5276where
5277    D: Deploy<'a>,
5278{
5279    let ((sink, source), connect_fn) = match (from_location, to_location) {
5280        (&LocationId::Process(from), &LocationId::Process(to)) => {
5281            let from_node = processes
5282                .get(from)
5283                .unwrap_or_else(|| {
5284                    panic!("A process used in the graph was not instantiated: {}", from)
5285                })
5286                .clone();
5287            let to_node = processes
5288                .get(to)
5289                .unwrap_or_else(|| {
5290                    panic!("A process used in the graph was not instantiated: {}", to)
5291                })
5292                .clone();
5293
5294            let sink_port = from_node.next_port();
5295            let source_port = to_node.next_port();
5296
5297            (
5298                D::o2o_sink_source(
5299                    env,
5300                    &from_node,
5301                    &sink_port,
5302                    &to_node,
5303                    &source_port,
5304                    name,
5305                    networking_info,
5306                ),
5307                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5308            )
5309        }
5310        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5311            let from_node = processes
5312                .get(from)
5313                .unwrap_or_else(|| {
5314                    panic!("A process used in the graph was not instantiated: {}", from)
5315                })
5316                .clone();
5317            let to_node = clusters
5318                .get(to)
5319                .unwrap_or_else(|| {
5320                    panic!("A cluster used in the graph was not instantiated: {}", to)
5321                })
5322                .clone();
5323
5324            let sink_port = from_node.next_port();
5325            let source_port = to_node.next_port();
5326
5327            (
5328                D::o2m_sink_source(
5329                    env,
5330                    &from_node,
5331                    &sink_port,
5332                    &to_node,
5333                    &source_port,
5334                    name,
5335                    networking_info,
5336                ),
5337                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5338            )
5339        }
5340        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5341            let from_node = clusters
5342                .get(from)
5343                .unwrap_or_else(|| {
5344                    panic!("A cluster used in the graph was not instantiated: {}", from)
5345                })
5346                .clone();
5347            let to_node = processes
5348                .get(to)
5349                .unwrap_or_else(|| {
5350                    panic!("A process used in the graph was not instantiated: {}", to)
5351                })
5352                .clone();
5353
5354            let sink_port = from_node.next_port();
5355            let source_port = to_node.next_port();
5356
5357            (
5358                D::m2o_sink_source(
5359                    env,
5360                    &from_node,
5361                    &sink_port,
5362                    &to_node,
5363                    &source_port,
5364                    name,
5365                    networking_info,
5366                ),
5367                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5368            )
5369        }
5370        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5371            let from_node = clusters
5372                .get(from)
5373                .unwrap_or_else(|| {
5374                    panic!("A cluster used in the graph was not instantiated: {}", from)
5375                })
5376                .clone();
5377            let to_node = clusters
5378                .get(to)
5379                .unwrap_or_else(|| {
5380                    panic!("A cluster used in the graph was not instantiated: {}", to)
5381                })
5382                .clone();
5383
5384            let sink_port = from_node.next_port();
5385            let source_port = to_node.next_port();
5386
5387            (
5388                D::m2m_sink_source(
5389                    env,
5390                    &from_node,
5391                    &sink_port,
5392                    &to_node,
5393                    &source_port,
5394                    name,
5395                    networking_info,
5396                ),
5397                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5398            )
5399        }
5400        (LocationId::Tick(_, _), _) => panic!(),
5401        (_, LocationId::Tick(_, _)) => panic!(),
5402        (LocationId::Atomic(_), _) => panic!(),
5403        (_, LocationId::Atomic(_)) => panic!(),
5404    };
5405    (sink, source, connect_fn)
5406}
5407
5408#[cfg(test)]
5409mod serde_test;
5410
5411#[cfg(test)]
5412mod test {
5413    use std::mem::size_of;
5414
5415    use stageleft::{QuotedWithContext, q};
5416
5417    use super::*;
5418
5419    #[test]
5420    #[cfg_attr(
5421        not(feature = "build"),
5422        ignore = "expects inclusion of feature-gated fields"
5423    )]
5424    fn hydro_node_size() {
5425        assert_eq!(size_of::<HydroNode>(), 264);
5426    }
5427
5428    #[test]
5429    #[cfg_attr(
5430        not(feature = "build"),
5431        ignore = "expects inclusion of feature-gated fields"
5432    )]
5433    fn hydro_root_size() {
5434        assert_eq!(size_of::<HydroRoot>(), 136);
5435    }
5436
5437    #[test]
5438    fn test_simplify_q_macro_basic() {
5439        // Test basic non-q! expression
5440        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5441        let result = simplify_q_macro(simple_expr.clone());
5442        assert_eq!(result, simple_expr);
5443    }
5444
5445    #[test]
5446    fn test_simplify_q_macro_actual_stageleft_call() {
5447        // Test a simplified version of what a real stageleft call might look like
5448        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5449        let result = simplify_q_macro(stageleft_call);
5450        // This should be processed by our visitor and simplified to q!(...)
5451        // since we detect the stageleft::runtime_support::fn_* pattern
5452        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5453    }
5454
5455    #[test]
5456    fn test_closure_no_pipe_at_start() {
5457        // Test a closure that does not start with a pipe
5458        let stageleft_call = q!({
5459            let foo = 123;
5460            move |b: usize| b + foo
5461        })
5462        .splice_fn1_ctx(&());
5463        let result = simplify_q_macro(stageleft_call);
5464        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5465    }
5466}