Skip to main content

hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use sealed::sealed;
11use stageleft::{IntoQuotedMut, QuotedWithContext, q};
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_stream::KeyedStream;
15use super::optional::Optional;
16use super::singleton::Singleton;
17use super::sliced::sliced;
18use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
19use crate::compile::builder::{CycleId, FlowState};
20use crate::compile::ir::{
21    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
22};
23#[cfg(stageleft_runtime)]
24use crate::forward_handle::{CycleCollection, ReceiverComplete};
25use crate::forward_handle::{ForwardRef, TickCycle};
26use crate::live_collections::stream::{Ordering, Retries};
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::DeferTick;
30use crate::location::{Atomic, Location, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::properties::manual_proof;
34
35/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
36///
37/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
38/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
39/// indicates that entries may be added over time, but once an entry is added it will never be
40/// removed and its value will never change.
41pub trait KeyedSingletonBound {
42    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
43    type UnderlyingBound: Boundedness;
44    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
45    type ValueBound: Boundedness;
46
47    /// The type of the keyed singleton if the value for each key is immutable.
48    type WithBoundedValue: KeyedSingletonBound<
49            UnderlyingBound = Self::UnderlyingBound,
50            ValueBound = Bounded,
51            EraseMonotonic = Self::WithBoundedValue,
52        >;
53
54    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
55    type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
56
57    /// The [`Boundedness`] of the keyed singleton produced by folding a [`KeyedStream`] with
58    /// [`Self`] boundedness when the aggregation does *not* have a monotonicity proof.
59    ///
60    /// Without a monotonicity proof, the per-key values may change arbitrarily, so an unbounded
61    /// input collapses to [`MonotonicKeys`] (keys are still only added, never removed).
62    type KeyedStreamToNonMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
63
64    /// The type of the keyed singleton if the value for each key is no longer monotonic.
65    type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
66
67    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
68    fn bound_kind() -> KeyedSingletonBoundKind;
69}
70
71impl KeyedSingletonBound for Unbounded {
72    type UnderlyingBound = Unbounded;
73    type ValueBound = Unbounded;
74    type WithBoundedValue = BoundedValue;
75    type KeyedStreamToMonotone = MonotonicValue;
76    type KeyedStreamToNonMonotone = MonotonicKeys;
77    type EraseMonotonic = Unbounded;
78
79    fn bound_kind() -> KeyedSingletonBoundKind {
80        KeyedSingletonBoundKind::Unbounded
81    }
82}
83
84impl KeyedSingletonBound for Bounded {
85    type UnderlyingBound = Bounded;
86    type ValueBound = Bounded;
87    type WithBoundedValue = Bounded;
88    type KeyedStreamToMonotone = Bounded;
89    type KeyedStreamToNonMonotone = Bounded;
90    type EraseMonotonic = Bounded;
91
92    fn bound_kind() -> KeyedSingletonBoundKind {
93        KeyedSingletonBoundKind::Bounded
94    }
95}
96
97/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
98/// its value is bounded and will never change, but new entries may appear asynchronously
99pub struct BoundedValue;
100
101impl KeyedSingletonBound for BoundedValue {
102    type UnderlyingBound = Unbounded;
103    type ValueBound = Bounded;
104    type WithBoundedValue = BoundedValue;
105    type KeyedStreamToMonotone = BoundedValue;
106    type KeyedStreamToNonMonotone = BoundedValue;
107    type EraseMonotonic = BoundedValue;
108
109    fn bound_kind() -> KeyedSingletonBoundKind {
110        KeyedSingletonBoundKind::BoundedValue
111    }
112}
113
114/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
115/// it will never be removed, and the corresponding value will only increase monotonically.
116pub struct MonotonicValue;
117
118impl KeyedSingletonBound for MonotonicValue {
119    type UnderlyingBound = Unbounded;
120    type ValueBound = Unbounded;
121    type WithBoundedValue = BoundedValue;
122    type KeyedStreamToMonotone = MonotonicValue;
123    type KeyedStreamToNonMonotone = MonotonicKeys;
124    type EraseMonotonic = MonotonicKeys;
125
126    fn bound_kind() -> KeyedSingletonBoundKind {
127        KeyedSingletonBoundKind::MonotonicValue
128    }
129}
130
131/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key
132/// appears, it will never be removed, but the corresponding value may change arbitrarily.
133pub struct MonotonicKeys;
134
135impl KeyedSingletonBound for MonotonicKeys {
136    type UnderlyingBound = Unbounded;
137    type ValueBound = Unbounded;
138    type WithBoundedValue = BoundedValue;
139    type KeyedStreamToMonotone = MonotonicKeys;
140    type KeyedStreamToNonMonotone = MonotonicKeys;
141    type EraseMonotonic = MonotonicKeys;
142
143    fn bound_kind() -> KeyedSingletonBoundKind {
144        KeyedSingletonBoundKind::MonotonicKeys
145    }
146}
147
148#[sealed]
149#[diagnostic::on_unimplemented(
150    message = "The keyed singleton must have monotonic values (`MonotonicValue`) or be bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
151    label = "required here",
152    note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
153)]
154/// Marker trait that is implemented for [`KeyedSingletonBound`] types whose per-key values
155/// are monotonically non-decreasing (or bounded).
156pub trait IsKeyedMonotonic: KeyedSingletonBound {}
157
158#[sealed]
159#[diagnostic::do_not_recommend]
160impl IsKeyedMonotonic for MonotonicValue {}
161
162#[sealed]
163#[diagnostic::do_not_recommend]
164impl IsKeyedMonotonic for BoundedValue {}
165
166#[sealed]
167#[diagnostic::do_not_recommend]
168impl<B: IsBounded + KeyedSingletonBound> IsKeyedMonotonic for B {}
169
170/// Mapping from keys of type `K` to values of type `V`.
171///
172/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
173/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
174/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
175/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
176/// keys cannot be removed and the value for each key is immutable.
177///
178/// Type Parameters:
179/// - `K`: the type of the key for each entry
180/// - `V`: the type of the value for each entry
181/// - `Loc`: the [`Location`] where the keyed singleton is materialized
182/// - `Bound`: tracks whether the entries are:
183///     - [`Bounded`] (local and finite)
184///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
185///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
186pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
187    pub(crate) location: Loc,
188    pub(crate) ir_node: RefCell<HydroNode>,
189    pub(crate) flow_state: FlowState,
190
191    _phantom: PhantomData<(K, V, Loc, Bound)>,
192}
193
194impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
195    fn drop(&mut self) {
196        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
197        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
198            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
199                input: Box::new(ir_node),
200                op_metadata: HydroIrOpMetadata::new(),
201            });
202        }
203    }
204}
205
206impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
207    for KeyedSingleton<K, V, Loc, Bound>
208{
209    fn clone(&self) -> Self {
210        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
211            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
212            *self.ir_node.borrow_mut() = HydroNode::Tee {
213                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
214                metadata: self.location.new_node_metadata(Self::collection_kind()),
215            };
216        }
217
218        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
219            KeyedSingleton {
220                location: self.location.clone(),
221                flow_state: self.flow_state.clone(),
222                ir_node: HydroNode::Tee {
223                    inner: SharedNode(inner.0.clone()),
224                    metadata: metadata.clone(),
225                }
226                .into(),
227                _phantom: PhantomData,
228            }
229        } else {
230            unreachable!()
231        }
232    }
233}
234
235impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
236    for KeyedSingleton<K, V, L, B>
237where
238    L: Location<'a>,
239{
240    type Location = L;
241
242    fn create_source(cycle_id: CycleId, location: L) -> Self {
243        KeyedSingleton {
244            flow_state: location.flow_state().clone(),
245            location: location.clone(),
246            ir_node: RefCell::new(HydroNode::CycleSource {
247                cycle_id,
248                metadata: location.new_node_metadata(Self::collection_kind()),
249            }),
250            _phantom: PhantomData,
251        }
252    }
253}
254
255impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
256where
257    L: Location<'a>,
258{
259    type Location = Tick<L>;
260
261    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
262        KeyedSingleton::new(
263            location.clone(),
264            HydroNode::CycleSource {
265                cycle_id,
266                metadata: location.new_node_metadata(Self::collection_kind()),
267            },
268        )
269    }
270}
271
272impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
273where
274    L: Location<'a>,
275{
276    fn defer_tick(self) -> Self {
277        KeyedSingleton::defer_tick(self)
278    }
279}
280
281impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
282    for KeyedSingleton<K, V, L, B>
283where
284    L: Location<'a>,
285{
286    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
287        assert_eq!(
288            Location::id(&self.location),
289            expected_location,
290            "locations do not match"
291        );
292        self.location
293            .flow_state()
294            .borrow_mut()
295            .push_root(HydroRoot::CycleSink {
296                cycle_id,
297                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
298                op_metadata: HydroIrOpMetadata::new(),
299            });
300    }
301}
302
303impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
304where
305    L: Location<'a>,
306{
307    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
308        assert_eq!(
309            Location::id(&self.location),
310            expected_location,
311            "locations do not match"
312        );
313        self.location
314            .flow_state()
315            .borrow_mut()
316            .push_root(HydroRoot::CycleSink {
317                cycle_id,
318                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
319                op_metadata: HydroIrOpMetadata::new(),
320            });
321    }
322}
323
324impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
325    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
326        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
327        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
328
329        let flow_state = location.flow_state().clone();
330        KeyedSingleton {
331            location,
332            flow_state,
333            ir_node: RefCell::new(ir_node),
334            _phantom: PhantomData,
335        }
336    }
337
338    /// Returns the [`Location`] where this keyed singleton is being materialized.
339    pub fn location(&self) -> &L {
340        &self.location
341    }
342
343    /// Weakens the consistency of this live collection to not guarantee any consistency across
344    /// cluster members (if this collection is on a cluster).
345    pub fn weaken_consistency(self) -> KeyedSingleton<K, V, L::DropConsistency, B>
346    where
347        L: Location<'a>,
348    {
349        if L::consistency()
350            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
351        {
352            // already no consistency
353            KeyedSingleton::new(
354                self.location.drop_consistency(),
355                self.ir_node.replace(HydroNode::Placeholder),
356            )
357        } else {
358            KeyedSingleton::new(
359                self.location.drop_consistency(),
360                HydroNode::Cast {
361                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
362                    metadata: self
363                        .location
364                        .drop_consistency()
365                        .new_node_metadata(
366                            KeyedSingleton::<K, V, L::DropConsistency, B>::collection_kind(),
367                        ),
368                },
369            )
370        }
371    }
372
373    /// Casts this live collection to have the consistency guarantees specified in the given
374    /// location type parameter. The developer must ensure that the strengthened consistency
375    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
376    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
377        self,
378        _proof: impl crate::properties::ConsistencyProof,
379    ) -> KeyedSingleton<K, V, L2, B>
380    where
381        L: Location<'a>,
382    {
383        if L::consistency() == L2::consistency() {
384            // already consistent
385            KeyedSingleton::new(
386                self.location.with_consistency_of(),
387                self.ir_node.replace(HydroNode::Placeholder),
388            )
389        } else {
390            KeyedSingleton::new(
391                self.location.with_consistency_of(),
392                HydroNode::AssertIsConsistent {
393                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
394                    trusted: false,
395                    metadata: self
396                        .location
397                        .clone()
398                        .with_consistency_of::<L2>()
399                        .new_node_metadata(KeyedSingleton::<K, V, L2, B>::collection_kind()),
400                },
401            )
402        }
403    }
404}
405
406#[cfg(stageleft_runtime)]
407fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
408    me: KeyedSingleton<K, V, L, Bounded>,
409) -> Singleton<usize, L, Bounded> {
410    me.entries().count()
411}
412
413#[cfg(stageleft_runtime)]
414fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
415    me: KeyedSingleton<K, V, L, Bounded>,
416) -> Singleton<HashMap<K, V>, L, Bounded>
417where
418    K: Eq + Hash,
419{
420    me.entries()
421        .assume_ordering_trusted(nondet!(
422            /// There is only one element associated with each key. The closure technically
423            /// isn't commutative in the case where both passed entries have the same key
424            /// but different values.
425            ///
426            /// In the future, we may want to have an `assume!(...)` statement in the UDF that
427            /// the key is never already present in the map.
428        ))
429        .fold(
430            q!(|| HashMap::new()),
431            q!(|map, (k, v)| {
432                map.insert(k, v);
433            }),
434        )
435}
436
437impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
438    pub(crate) fn collection_kind() -> CollectionKind {
439        CollectionKind::KeyedSingleton {
440            bound: B::bound_kind(),
441            key_type: stageleft::quote_type::<K>().into(),
442            value_type: stageleft::quote_type::<V>().into(),
443        }
444    }
445
446    /// Transforms each value by invoking `f` on each element, with keys staying the same
447    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
448    ///
449    /// If you do not want to modify the stream and instead only want to view
450    /// each item use [`KeyedSingleton::inspect`] instead.
451    ///
452    /// # Example
453    /// ```rust
454    /// # #[cfg(feature = "deploy")] {
455    /// # use hydro_lang::prelude::*;
456    /// # use futures::StreamExt;
457    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
458    /// let keyed_singleton = // { 1: 2, 2: 4 }
459    /// # process
460    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
461    /// #     .into_keyed()
462    /// #     .first();
463    /// keyed_singleton.map(q!(|v| v + 1))
464    /// #   .entries()
465    /// # }, |mut stream| async move {
466    /// // { 1: 3, 2: 5 }
467    /// # let mut results = Vec::new();
468    /// # for _ in 0..2 {
469    /// #     results.push(stream.next().await.unwrap());
470    /// # }
471    /// # results.sort();
472    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
473    /// # }));
474    /// # }
475    /// ```
476    pub fn map<U, F>(
477        self,
478        f: impl IntoQuotedMut<'a, F, L> + Copy,
479    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
480    where
481        F: Fn(V) -> U + 'a,
482    {
483        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
484        let map_f = q!({
485            let orig = f;
486            move |(k, v)| (k, orig(v))
487        })
488        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
489        .into();
490
491        KeyedSingleton::new(
492            self.location.clone(),
493            HydroNode::Map {
494                f: map_f,
495                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
496                metadata: self.location.new_node_metadata(KeyedSingleton::<
497                    K,
498                    U,
499                    L,
500                    B::EraseMonotonic,
501                >::collection_kind()),
502            },
503        )
504    }
505
506    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
507    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
508    ///
509    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
510    /// the new value `U`. The key remains unchanged in the output.
511    ///
512    /// # Example
513    /// ```rust
514    /// # #[cfg(feature = "deploy")] {
515    /// # use hydro_lang::prelude::*;
516    /// # use futures::StreamExt;
517    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
518    /// let keyed_singleton = // { 1: 2, 2: 4 }
519    /// # process
520    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
521    /// #     .into_keyed()
522    /// #     .first();
523    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
524    /// #   .entries()
525    /// # }, |mut stream| async move {
526    /// // { 1: 3, 2: 6 }
527    /// # let mut results = Vec::new();
528    /// # for _ in 0..2 {
529    /// #     results.push(stream.next().await.unwrap());
530    /// # }
531    /// # results.sort();
532    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
533    /// # }));
534    /// # }
535    /// ```
536    pub fn map_with_key<U, F>(
537        self,
538        f: impl IntoQuotedMut<'a, F, L> + Copy,
539    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
540    where
541        F: Fn((K, V)) -> U + 'a,
542        K: Clone,
543    {
544        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
545        let map_f = q!({
546            let orig = f;
547            move |(k, v)| {
548                let out = orig((Clone::clone(&k), v));
549                (k, out)
550            }
551        })
552        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
553        .into();
554
555        KeyedSingleton::new(
556            self.location.clone(),
557            HydroNode::Map {
558                f: map_f,
559                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
560                metadata: self.location.new_node_metadata(KeyedSingleton::<
561                    K,
562                    U,
563                    L,
564                    B::EraseMonotonic,
565                >::collection_kind()),
566            },
567        )
568    }
569
570    /// Gets the number of keys in the keyed singleton.
571    ///
572    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
573    /// since keys may be added / removed over time. When the set of keys changes, the count will
574    /// be asynchronously updated.
575    ///
576    /// # Example
577    /// ```rust
578    /// # #[cfg(feature = "deploy")] {
579    /// # use hydro_lang::prelude::*;
580    /// # use futures::StreamExt;
581    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
582    /// # let tick = process.tick();
583    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
584    /// # process
585    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
586    /// #     .into_keyed()
587    /// #     .batch(&tick, nondet!(/** test */))
588    /// #     .first();
589    /// keyed_singleton.key_count()
590    /// # .all_ticks()
591    /// # }, |mut stream| async move {
592    /// // 3
593    /// # assert_eq!(stream.next().await.unwrap(), 3);
594    /// # }));
595    /// # }
596    /// ```
597    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
598        if B::ValueBound::BOUNDED {
599            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
600                location: self.location.clone(),
601                flow_state: self.flow_state.clone(),
602                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
603                _phantom: PhantomData,
604            };
605
606            me.entries().count().ignore_monotonic()
607        } else if L::is_top_level()
608            && let Some(tick) = self.location.try_tick()
609            && (B::bound_kind() == KeyedSingletonBoundKind::Unbounded
610                || B::bound_kind() == KeyedSingletonBoundKind::MonotonicKeys
611                || B::bound_kind() == KeyedSingletonBoundKind::MonotonicValue)
612        {
613            let location = self.location.clone();
614            let ir_node = self.ir_node.replace(HydroNode::Placeholder);
615            let me: KeyedSingleton<K, V, L, MonotonicKeys> =
616                KeyedSingleton::new(location.clone(), ir_node);
617
618            let out =
619                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
620                    .latest();
621            Singleton::new(location, out.ir_node.replace(HydroNode::Placeholder))
622        } else {
623            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
624        }
625    }
626
627    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
628    ///
629    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
630    /// asynchronously as well.
631    ///
632    /// # Example
633    /// ```rust
634    /// # #[cfg(feature = "deploy")] {
635    /// # use hydro_lang::prelude::*;
636    /// # use futures::StreamExt;
637    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
638    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
639    /// # process
640    /// #     .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
641    /// #     .into_keyed()
642    /// #     .batch(&process.tick(), nondet!(/** test */))
643    /// #     .first();
644    /// keyed_singleton.into_singleton()
645    /// # .all_ticks()
646    /// # }, |mut stream| async move {
647    /// // { 1: "a", 2: "b", 3: "c" }
648    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
649    /// # }));
650    /// # }
651    /// ```
652    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
653    where
654        K: Eq + Hash,
655    {
656        if B::ValueBound::BOUNDED {
657            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
658                location: self.location.clone(),
659                flow_state: self.flow_state.clone(),
660                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
661                _phantom: PhantomData,
662            };
663
664            me.entries()
665                .assume_ordering_trusted(nondet!(
666                    /// There is only one element associated with each key. The closure technically
667                    /// isn't commutative in the case where both passed entries have the same key
668                    /// but different values.
669                    ///
670                    /// In the future, we may want to have an `assume!(...)` statement in the UDF that
671                    /// the key is never already present in the map.
672                ))
673                .fold(
674                    q!(|| HashMap::new()),
675                    q!(|map, (k, v)| {
676                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
677                        map.insert(k, v);
678                    }),
679                )
680        } else if L::is_top_level()
681            && let Some(tick) = self.location.try_tick()
682            && (B::bound_kind() == KeyedSingletonBoundKind::Unbounded
683                || B::bound_kind() == KeyedSingletonBoundKind::MonotonicKeys
684                || B::bound_kind() == KeyedSingletonBoundKind::MonotonicValue)
685        {
686            let location = self.location.clone();
687            let ir_node = self.ir_node.replace(HydroNode::Placeholder);
688            let me: KeyedSingleton<K, V, L, MonotonicKeys> =
689                KeyedSingleton::new(location.clone(), ir_node);
690
691            let out = into_singleton_inside_tick(
692                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
693            )
694            .latest();
695            Singleton::new(location, out.ir_node.replace(HydroNode::Placeholder))
696        } else {
697            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
698        }
699    }
700
701    /// An operator which allows you to "name" a `HydroNode`.
702    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
703    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
704        {
705            let mut node = self.ir_node.borrow_mut();
706            let metadata = node.metadata_mut();
707            metadata.tag = Some(name.to_owned());
708        }
709        self
710    }
711
712    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
713    /// implies that `B == Bounded`.
714    pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
715    where
716        B: IsBounded,
717    {
718        KeyedSingleton::new(
719            self.location.clone(),
720            self.ir_node.replace(HydroNode::Placeholder),
721        )
722    }
723
724    /// Gets the value associated with a specific key from the keyed singleton.
725    /// Returns `None` if the key is `None` or there is no associated value.
726    ///
727    /// # Example
728    /// ```rust
729    /// # #[cfg(feature = "deploy")] {
730    /// # use hydro_lang::prelude::*;
731    /// # use futures::StreamExt;
732    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
733    /// let tick = process.tick();
734    /// let keyed_data = process
735    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
736    ///     .into_keyed()
737    ///     .batch(&tick, nondet!(/** test */))
738    ///     .first();
739    /// let key = tick.singleton(q!(1));
740    /// keyed_data.get(key).all_ticks()
741    /// # }, |mut stream| async move {
742    /// // 2
743    /// # assert_eq!(stream.next().await.unwrap(), 2);
744    /// # }));
745    /// # }
746    /// ```
747    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
748    where
749        B: IsBounded,
750        K: Hash + Eq + Clone,
751        V: Clone,
752    {
753        self.make_bounded()
754            .into_keyed_stream()
755            .get(key)
756            .cast_at_most_one_element()
757    }
758
759    /// Emit a keyed stream containing keys shared between the keyed singleton and the
760    /// keyed stream, where each value in the output keyed stream is a tuple of
761    /// (the keyed singleton's value, the keyed stream's value).
762    ///
763    /// # Example
764    /// ```rust
765    /// # #[cfg(feature = "deploy")] {
766    /// # use hydro_lang::prelude::*;
767    /// # use futures::StreamExt;
768    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
769    /// let tick = process.tick();
770    /// let keyed_data = process
771    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
772    ///     .into_keyed()
773    ///     .batch(&tick, nondet!(/** test */))
774    ///     .first();
775    /// let other_data = process
776    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
777    ///     .into_keyed()
778    ///     .batch(&tick, nondet!(/** test */));
779    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
780    /// # }, |mut stream| async move {
781    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
782    /// # let mut results = vec![];
783    /// # for _ in 0..3 {
784    /// #     results.push(stream.next().await.unwrap());
785    /// # }
786    /// # results.sort();
787    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
788    /// # }));
789    /// # }
790    /// ```
791    pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2, B2: Boundedness>(
792        self,
793        other: KeyedStream<K, V2, L, B2, O2, R2>,
794    ) -> KeyedStream<K, (V, V2), L, B2, O2, R2>
795    where
796        B: IsBounded,
797        K: Eq + Hash + Clone,
798        V: Clone,
799        V2: Clone,
800    {
801        // TODO(shadaj): if DFIR guarantees that joining unbounded keyed stream x bounded keyed stream
802        // always produces deterministic order per key (nested loop join), this could just use
803        // `join_keyed_stream` without constructing IRs manually
804        KeyedStream::new(
805            self.location.clone(),
806            HydroNode::Join {
807                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
808                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
809                metadata: self
810                    .location
811                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B2, O2, R2>::collection_kind()),
812            },
813        )
814    }
815
816    /// Emit a keyed singleton containing all keys shared between two keyed singletons,
817    /// where each value in the output keyed singleton is a tuple of
818    /// (self.value, other.value).
819    ///
820    /// # Example
821    /// ```rust
822    /// # #[cfg(feature = "deploy")] {
823    /// # use hydro_lang::prelude::*;
824    /// # use futures::StreamExt;
825    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
826    /// # let tick = process.tick();
827    /// let requests = // { 1: 10, 2: 20, 3: 30 }
828    /// # process
829    /// #     .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
830    /// #     .into_keyed()
831    /// #     .batch(&tick, nondet!(/** test */))
832    /// #     .first();
833    /// let other = // { 1: 100, 2: 200, 4: 400 }
834    /// # process
835    /// #     .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
836    /// #     .into_keyed()
837    /// #     .batch(&tick, nondet!(/** test */))
838    /// #     .first();
839    /// requests.join_keyed_singleton(other)
840    /// # .entries().all_ticks()
841    /// # }, |mut stream| async move {
842    /// // { 1: (10, 100), 2: (20, 200) }
843    /// # let mut results = vec![];
844    /// # for _ in 0..2 {
845    /// #     results.push(stream.next().await.unwrap());
846    /// # }
847    /// # results.sort();
848    /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
849    /// # }));
850    /// # }
851    /// ```
852    pub fn join_keyed_singleton<V2: Clone>(
853        self,
854        other: KeyedSingleton<K, V2, L, Bounded>,
855    ) -> KeyedSingleton<K, (V, V2), L, Bounded>
856    where
857        B: IsBounded,
858        K: Eq + Hash + Clone,
859        V: Clone,
860    {
861        let result_stream = self
862            .make_bounded()
863            .entries()
864            .join(other.entries())
865            .into_keyed();
866
867        // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
868        result_stream.cast_at_most_one_entry_per_key()
869    }
870
871    /// For each value in `self`, find the matching key in `lookup`.
872    /// The output is a keyed singleton with the key from `self`, and a value
873    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
874    /// If the key is not present in `lookup`, the option will be [`None`].
875    ///
876    /// # Example
877    /// ```rust
878    /// # #[cfg(feature = "deploy")] {
879    /// # use hydro_lang::prelude::*;
880    /// # use futures::StreamExt;
881    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
882    /// # let tick = process.tick();
883    /// let requests = // { 1: 10, 2: 20 }
884    /// # process
885    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
886    /// #     .into_keyed()
887    /// #     .batch(&tick, nondet!(/** test */))
888    /// #     .first();
889    /// let other_data = // { 10: 100, 11: 110 }
890    /// # process
891    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
892    /// #     .into_keyed()
893    /// #     .batch(&tick, nondet!(/** test */))
894    /// #     .first();
895    /// requests.lookup_keyed_singleton(other_data)
896    /// # .entries().all_ticks()
897    /// # }, |mut stream| async move {
898    /// // { 1: (10, Some(100)), 2: (20, None) }
899    /// # let mut results = vec![];
900    /// # for _ in 0..2 {
901    /// #     results.push(stream.next().await.unwrap());
902    /// # }
903    /// # results.sort();
904    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
905    /// # }));
906    /// # }
907    /// ```
908    pub fn lookup_keyed_singleton<V2>(
909        self,
910        lookup: KeyedSingleton<V, V2, L, Bounded>,
911    ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
912    where
913        B: IsBounded,
914        K: Eq + Hash + Clone,
915        V: Eq + Hash + Clone,
916        V2: Clone,
917    {
918        let result_stream = self
919            .make_bounded()
920            .into_keyed_stream()
921            .lookup_keyed_stream(lookup.into_keyed_stream());
922
923        // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
924        result_stream.cast_at_most_one_entry_per_key()
925    }
926
927    /// For each value in `self`, find the matching key in `lookup`.
928    /// The output is a keyed stream with the key from `self`, and a value
929    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
930    /// If the key is not present in `lookup`, the option will be [`None`].
931    ///
932    /// # Example
933    /// ```rust
934    /// # #[cfg(feature = "deploy")] {
935    /// # use hydro_lang::prelude::*;
936    /// # use futures::StreamExt;
937    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
938    /// # let tick = process.tick();
939    /// let requests = // { 1: 10, 2: 20 }
940    /// # process
941    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
942    /// #     .into_keyed()
943    /// #     .batch(&tick, nondet!(/** test */))
944    /// #     .first();
945    /// let other_data = // { 10: 100, 10: 110 }
946    /// # process
947    /// #     .source_iter(q!(vec![(10, 100), (10, 110)]))
948    /// #     .into_keyed()
949    /// #     .batch(&tick, nondet!(/** test */));
950    /// requests.lookup_keyed_stream(other_data)
951    /// # .entries().all_ticks()
952    /// # }, |mut stream| async move {
953    /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
954    /// # let mut results = vec![];
955    /// # for _ in 0..3 {
956    /// #     results.push(stream.next().await.unwrap());
957    /// # }
958    /// # results.sort();
959    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
960    /// # }));
961    /// # }
962    /// ```
963    pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
964        self,
965        lookup: KeyedStream<V, V2, L, Bounded, O, R>,
966    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
967    where
968        B: IsBounded,
969        K: Eq + Hash + Clone,
970        V: Eq + Hash + Clone,
971        V2: Clone,
972    {
973        self.make_bounded()
974            .entries()
975            .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
976            .into_keyed()
977            .lookup_keyed_stream(lookup)
978    }
979
980    /// For each key present in both `self` and `thresholds`, emits a [`KeyedStream`] event the first
981    /// time that key's value becomes greater than or equal to the corresponding threshold value.
982    /// The emitted value for each key is the threshold value itself.
983    ///
984    /// This requires the keyed singleton to have monotonic values ([`MonotonicValue`] or [`Bounded`]),
985    /// because otherwise the threshold detection would be non-deterministic.
986    ///
987    /// The `thresholds` parameter is a [`BoundedValue`] keyed singleton mapping each key to its
988    /// threshold. Thresholds may arrive asynchronously (new keys appear over time), but once set
989    /// for a key, the threshold value is fixed. Late-arriving thresholds are checked against the
990    /// current snapshot value immediately.
991    ///
992    /// # Example
993    /// ```rust,ignore
994    /// use hydro_lang::prelude::*;
995    ///
996    /// // Given a monotonically increasing keyed singleton (e.g. from fold with monotone proof)
997    /// let counts: KeyedSingleton<u32, usize, _, MonotonicValue> = events.into_keyed()
998    ///     .fold(q!(|| 0), q!(|acc, _| *acc += 1, monotone = manual_proof!(/** +1 is monotone */)));
999    ///
1000    /// // BoundedValue keyed singleton of thresholds (from .first())
1001    /// let thresholds = threshold_source.into_keyed().first();
1002    ///
1003    /// // Emits (key, threshold_value) the first time each key's value >= threshold
1004    /// let crossed = counts.threshold_greater_or_equal(thresholds);
1005    /// ```
1006    pub fn threshold_greater_or_equal(
1007        self,
1008        thresholds: KeyedSingleton<K, V, L, BoundedValue>,
1009    ) -> KeyedStream<K, V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
1010    where
1011        K: Clone + Eq + Hash,
1012        V: Clone + PartialOrd,
1013        B: IsKeyedMonotonic,
1014    {
1015        let self_location = self.location.clone();
1016        match B::bound_kind() {
1017            KeyedSingletonBoundKind::Bounded => {
1018                // Bounded case: self is already fixed, just join and filter
1019                let me: KeyedSingleton<K, V, L, Bounded> = KeyedSingleton::new(
1020                    self.location.clone(),
1021                    self.ir_node.replace(HydroNode::Placeholder),
1022                );
1023                let result = me
1024                    .entries()
1025                    .join(thresholds.entries())
1026                    .filter_map(q!(|(k, (val, thresh))| {
1027                        if val >= thresh {
1028                            Some((k, thresh))
1029                        } else {
1030                            None
1031                        }
1032                    }))
1033                    .into_keyed();
1034                KeyedStream::new(
1035                    result.location.clone(),
1036                    result.ir_node.replace(HydroNode::Placeholder),
1037                )
1038            }
1039            KeyedSingletonBoundKind::MonotonicValue => {
1040                let me: KeyedSingleton<K, V, L, MonotonicValue> = KeyedSingleton::new(
1041                    self.location.clone(),
1042                    self.ir_node.replace(HydroNode::Placeholder),
1043                );
1044
1045                let result = sliced! {
1046                    let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1047                    let thresh_snapshot =
1048                        use(thresholds, nondet!(/** thresholds are deterministic */));
1049                    let mut already_crossed =
1050                        use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1051
1052                    let joined = thresh_snapshot.entries().join(snapshot.entries());
1053                    let passed = joined
1054                        .filter(q!(|(_, (thresh, val))| *val >= *thresh))
1055                        .map(q!(|(k, (thresh, _))| (k, thresh)));
1056
1057                    let newly_crossed = passed.anti_join(already_crossed.clone());
1058                    already_crossed =
1059                        already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1060
1061                    newly_crossed.into_keyed()
1062                };
1063
1064                KeyedStream::new(
1065                    self_location,
1066                    result.ir_node.replace(HydroNode::Placeholder),
1067                )
1068            }
1069            KeyedSingletonBoundKind::BoundedValue => {
1070                let me: KeyedSingleton<K, V, L, BoundedValue> = KeyedSingleton::new(
1071                    self.location.clone(),
1072                    self.ir_node.replace(HydroNode::Placeholder),
1073                );
1074
1075                let result = sliced! {
1076                    let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1077                    let thresh_snapshot =
1078                        use(thresholds, nondet!(/** thresholds are deterministic */));
1079                    let mut already_crossed =
1080                        use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1081
1082                    let joined = thresh_snapshot.entries().join(snapshot.entries());
1083                    let passed = joined
1084                        .filter(q!(|(_, (thresh, val))| *val >= *thresh))
1085                        .map(q!(|(k, (thresh, _))| (k, thresh)));
1086
1087                    let newly_crossed = passed.anti_join(already_crossed.clone());
1088                    already_crossed =
1089                        already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1090
1091                    newly_crossed.into_keyed()
1092                };
1093
1094                KeyedStream::new(
1095                    self_location,
1096                    result.ir_node.replace(HydroNode::Placeholder),
1097                )
1098            }
1099            _ => {
1100                unreachable!(
1101                    "IsKeyedMonotonic is only implemented for Bounded, BoundedValue, and MonotonicValue"
1102                )
1103            }
1104        }
1105    }
1106
1107    /// Like [`Self::threshold_greater_or_equal`], but uses a single [`Singleton`] threshold
1108    /// shared across all keys. Emits a `(K, V)` event for each key the first time that key's
1109    /// value becomes >= the threshold. The emitted value is the threshold itself.
1110    ///
1111    /// Because the threshold is a [`Bounded`] singleton, it is a compile-time constant and
1112    /// does not carry ongoing memory cost.
1113    ///
1114    /// # Example
1115    /// ```rust
1116    /// # #[cfg(feature = "deploy")] {
1117    /// # use hydro_lang::prelude::*;
1118    /// # use futures::StreamExt;
1119    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1120    /// // A keyed singleton of per-key values (in practice often a monotone counter): { 1: 6, 2: 4 }
1121    /// let counts = process
1122    ///     .source_iter(q!(vec![(1, 6), (2, 4)]))
1123    ///     .into_keyed()
1124    ///     .first();
1125    ///
1126    /// // A single threshold value shared across all keys
1127    /// let threshold = process.singleton(q!(5));
1128    ///
1129    /// // Emits (key, threshold) the first time each key's value >= threshold
1130    /// counts.threshold_greater_or_equal_uniform(threshold)
1131    /// #   .entries()
1132    /// # }, |mut stream| async move {
1133    /// // { 1: 5 } -- key 1's value 6 >= 5, but key 2's value 4 < 5
1134    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1135    /// # }));
1136    /// # }
1137    /// ```
1138    pub fn threshold_greater_or_equal_uniform(
1139        self,
1140        threshold: Singleton<V, L, Bounded>,
1141    ) -> KeyedStream<K, V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
1142    where
1143        K: Clone + Eq + Hash,
1144        V: Clone + PartialOrd,
1145        B: IsKeyedMonotonic,
1146    {
1147        let self_location = self.location.clone();
1148        match B::bound_kind() {
1149            KeyedSingletonBoundKind::Bounded => {
1150                let me: KeyedSingleton<K, V, L, Bounded> = KeyedSingleton::new(
1151                    self.location.clone(),
1152                    self.ir_node.replace(HydroNode::Placeholder),
1153                );
1154                let result = me
1155                    .entries()
1156                    .cross_singleton(threshold)
1157                    .filter_map(q!(|((k, val), thresh)| {
1158                        if val >= thresh {
1159                            Some((k, thresh))
1160                        } else {
1161                            None
1162                        }
1163                    }))
1164                    .into_keyed();
1165                KeyedStream::new(
1166                    result.location.clone(),
1167                    result.ir_node.replace(HydroNode::Placeholder),
1168                )
1169            }
1170            KeyedSingletonBoundKind::MonotonicValue => {
1171                let me: KeyedSingleton<K, V, L, MonotonicValue> = KeyedSingleton::new(
1172                    self.location.clone(),
1173                    self.ir_node.replace(HydroNode::Placeholder),
1174                );
1175
1176                let result = sliced! {
1177                    let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1178                    let mut already_crossed =
1179                        use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1180
1181                    let tick = snapshot.location().clone();
1182                    let thresh_in_tick = threshold.clone_into_tick(&tick);
1183
1184                    let crossing = snapshot
1185                        .entries()
1186                        .cross_singleton(thresh_in_tick)
1187                        .filter_map(q!(|((k, val), thresh)| {
1188                            if val >= thresh {
1189                                Some((k, thresh))
1190                            } else {
1191                                None
1192                            }
1193                        }));
1194
1195                    let newly_crossed = crossing.anti_join(already_crossed.clone());
1196                    already_crossed =
1197                        already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1198
1199                    newly_crossed.into_keyed()
1200                };
1201
1202                KeyedStream::new(
1203                    self_location,
1204                    result.ir_node.replace(HydroNode::Placeholder),
1205                )
1206            }
1207            KeyedSingletonBoundKind::BoundedValue => {
1208                let me: KeyedSingleton<K, V, L, BoundedValue> = KeyedSingleton::new(
1209                    self.location.clone(),
1210                    self.ir_node.replace(HydroNode::Placeholder),
1211                );
1212
1213                let result = sliced! {
1214                    let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1215                    let mut already_crossed =
1216                        use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1217
1218                    let tick = snapshot.location().clone();
1219                    let thresh_in_tick = threshold.clone_into_tick(&tick);
1220
1221                    let crossing = snapshot
1222                        .entries()
1223                        .cross_singleton(thresh_in_tick)
1224                        .filter_map(q!(|((k, val), thresh)| {
1225                            if val >= thresh {
1226                                Some((k, thresh))
1227                            } else {
1228                                None
1229                            }
1230                        }));
1231
1232                    let newly_crossed = crossing.anti_join(already_crossed.clone());
1233                    already_crossed =
1234                        already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1235
1236                    newly_crossed.into_keyed()
1237                };
1238
1239                KeyedStream::new(
1240                    self_location,
1241                    result.ir_node.replace(HydroNode::Placeholder),
1242                )
1243            }
1244            _ => {
1245                unreachable!(
1246                    "IsKeyedMonotonic is only implemented for Bounded, BoundedValue, and MonotonicValue"
1247                )
1248            }
1249        }
1250    }
1251}
1252
1253impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
1254    KeyedSingleton<K, V, L, B>
1255{
1256    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
1257    ///
1258    /// The value for each key must be bounded, otherwise the resulting stream elements would be
1259    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
1260    /// into the output.
1261    ///
1262    /// # Example
1263    /// ```rust
1264    /// # #[cfg(feature = "deploy")] {
1265    /// # use hydro_lang::prelude::*;
1266    /// # use futures::StreamExt;
1267    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1268    /// let keyed_singleton = // { 1: 2, 2: 4 }
1269    /// # process
1270    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1271    /// #     .into_keyed()
1272    /// #     .first();
1273    /// keyed_singleton.entries()
1274    /// # }, |mut stream| async move {
1275    /// // (1, 2), (2, 4) in any order
1276    /// # let mut results = Vec::new();
1277    /// # for _ in 0..2 {
1278    /// #     results.push(stream.next().await.unwrap());
1279    /// # }
1280    /// # results.sort();
1281    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1282    /// # }));
1283    /// # }
1284    /// ```
1285    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1286        self.into_keyed_stream().entries()
1287    }
1288
1289    /// Flattens the keyed singleton into an unordered stream of just the values.
1290    ///
1291    /// The value for each key must be bounded, otherwise the resulting stream elements would be
1292    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
1293    /// into the output.
1294    ///
1295    /// # Example
1296    /// ```rust
1297    /// # #[cfg(feature = "deploy")] {
1298    /// # use hydro_lang::prelude::*;
1299    /// # use futures::StreamExt;
1300    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1301    /// let keyed_singleton = // { 1: 2, 2: 4 }
1302    /// # process
1303    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1304    /// #     .into_keyed()
1305    /// #     .first();
1306    /// keyed_singleton.values()
1307    /// # }, |mut stream| async move {
1308    /// // 2, 4 in any order
1309    /// # let mut results = Vec::new();
1310    /// # for _ in 0..2 {
1311    /// #     results.push(stream.next().await.unwrap());
1312    /// # }
1313    /// # results.sort();
1314    /// # assert_eq!(results, vec![2, 4]);
1315    /// # }));
1316    /// # }
1317    /// ```
1318    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1319        let map_f = q!(|(_, v)| v)
1320            .splice_fn1_ctx::<(K, V), V>(&self.location)
1321            .into();
1322
1323        Stream::new(
1324            self.location.clone(),
1325            HydroNode::Map {
1326                f: map_f,
1327                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1328                metadata: self.location.new_node_metadata(Stream::<
1329                    V,
1330                    L,
1331                    B::UnderlyingBound,
1332                    NoOrder,
1333                    ExactlyOnce,
1334                >::collection_kind()),
1335            },
1336        )
1337    }
1338
1339    /// Flattens the keyed singleton into an unordered stream of just the keys.
1340    ///
1341    /// The value for each key must be bounded, otherwise the removal of keys would result in
1342    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
1343    /// into the output.
1344    ///
1345    /// # Example
1346    /// ```rust
1347    /// # #[cfg(feature = "deploy")] {
1348    /// # use hydro_lang::prelude::*;
1349    /// # use futures::StreamExt;
1350    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1351    /// let keyed_singleton = // { 1: 2, 2: 4 }
1352    /// # process
1353    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1354    /// #     .into_keyed()
1355    /// #     .first();
1356    /// keyed_singleton.keys()
1357    /// # }, |mut stream| async move {
1358    /// // 1, 2 in any order
1359    /// # let mut results = Vec::new();
1360    /// # for _ in 0..2 {
1361    /// #     results.push(stream.next().await.unwrap());
1362    /// # }
1363    /// # results.sort();
1364    /// # assert_eq!(results, vec![1, 2]);
1365    /// # }));
1366    /// # }
1367    /// ```
1368    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1369        self.entries().map(q!(|(k, _)| k))
1370    }
1371
1372    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
1373    /// entries whose keys are not in the provided stream.
1374    ///
1375    /// # Example
1376    /// ```rust
1377    /// # #[cfg(feature = "deploy")] {
1378    /// # use hydro_lang::prelude::*;
1379    /// # use futures::StreamExt;
1380    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1381    /// let tick = process.tick();
1382    /// let keyed_singleton = // { 1: 2, 2: 4 }
1383    /// # process
1384    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1385    /// #     .into_keyed()
1386    /// #     .first()
1387    /// #     .batch(&tick, nondet!(/** test */));
1388    /// let keys_to_remove = process
1389    ///     .source_iter(q!(vec![1]))
1390    ///     .batch(&tick, nondet!(/** test */));
1391    /// keyed_singleton.filter_key_not_in(keys_to_remove)
1392    /// #   .entries().all_ticks()
1393    /// # }, |mut stream| async move {
1394    /// // { 2: 4 }
1395    /// # for w in vec![(2, 4)] {
1396    /// #     assert_eq!(stream.next().await.unwrap(), w);
1397    /// # }
1398    /// # }));
1399    /// # }
1400    /// ```
1401    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1402        self,
1403        other: Stream<K, L, Bounded, O2, R2>,
1404    ) -> Self
1405    where
1406        K: Hash + Eq,
1407    {
1408        check_matching_location(&self.location, &other.location);
1409
1410        KeyedSingleton::new(
1411            self.location.clone(),
1412            HydroNode::AntiJoin {
1413                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1414                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1415                metadata: self.location.new_node_metadata(Self::collection_kind()),
1416            },
1417        )
1418    }
1419
1420    /// An operator which allows you to "inspect" each value of a keyed singleton without
1421    /// modifying it. The closure `f` is called on a reference to each value. This is
1422    /// mainly useful for debugging, and should not be used to generate side-effects.
1423    ///
1424    /// # Example
1425    /// ```rust
1426    /// # #[cfg(feature = "deploy")] {
1427    /// # use hydro_lang::prelude::*;
1428    /// # use futures::StreamExt;
1429    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1430    /// let keyed_singleton = // { 1: 2, 2: 4 }
1431    /// # process
1432    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1433    /// #     .into_keyed()
1434    /// #     .first();
1435    /// keyed_singleton
1436    ///     .inspect(q!(|v| println!("{}", v)))
1437    /// #   .entries()
1438    /// # }, |mut stream| async move {
1439    /// // { 1: 2, 2: 4 }
1440    /// # for w in vec![(1, 2), (2, 4)] {
1441    /// #     assert_eq!(stream.next().await.unwrap(), w);
1442    /// # }
1443    /// # }));
1444    /// # }
1445    /// ```
1446    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1447    where
1448        F: Fn(&V) + 'a,
1449    {
1450        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1451        let inspect_f = q!({
1452            let orig = f;
1453            move |t: &(_, _)| orig(&t.1)
1454        })
1455        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1456        .into();
1457
1458        KeyedSingleton::new(
1459            self.location.clone(),
1460            HydroNode::Inspect {
1461                f: inspect_f,
1462                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1463                metadata: self.location.new_node_metadata(Self::collection_kind()),
1464            },
1465        )
1466    }
1467
1468    /// An operator which allows you to "inspect" each entry of a keyed singleton without
1469    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1470    /// mainly useful for debugging, and should not be used to generate side-effects.
1471    ///
1472    /// # Example
1473    /// ```rust
1474    /// # #[cfg(feature = "deploy")] {
1475    /// # use hydro_lang::prelude::*;
1476    /// # use futures::StreamExt;
1477    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1478    /// let keyed_singleton = // { 1: 2, 2: 4 }
1479    /// # process
1480    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1481    /// #     .into_keyed()
1482    /// #     .first();
1483    /// keyed_singleton
1484    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1485    /// #   .entries()
1486    /// # }, |mut stream| async move {
1487    /// // { 1: 2, 2: 4 }
1488    /// # for w in vec![(1, 2), (2, 4)] {
1489    /// #     assert_eq!(stream.next().await.unwrap(), w);
1490    /// # }
1491    /// # }));
1492    /// # }
1493    /// ```
1494    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1495    where
1496        F: Fn(&(K, V)) + 'a,
1497    {
1498        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1499
1500        KeyedSingleton::new(
1501            self.location.clone(),
1502            HydroNode::Inspect {
1503                f: inspect_f,
1504                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1505                metadata: self.location.new_node_metadata(Self::collection_kind()),
1506            },
1507        )
1508    }
1509
1510    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1511    ///
1512    /// Because this method requires values to be bounded, the output [`Optional`] will only be
1513    /// asynchronously updated if a new key is added that is higher than the previous max key.
1514    ///
1515    /// # Example
1516    /// ```rust
1517    /// # #[cfg(feature = "deploy")] {
1518    /// # use hydro_lang::prelude::*;
1519    /// # use futures::StreamExt;
1520    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1521    /// let tick = process.tick();
1522    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1523    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1524    /// #     .into_keyed()
1525    /// #     .first();
1526    /// keyed_singleton.get_max_key()
1527    /// # .sample_eager(nondet!(/** test */))
1528    /// # }, |mut stream| async move {
1529    /// // (2, 456)
1530    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1531    /// # }));
1532    /// # }
1533    /// ```
1534    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1535    where
1536        K: Ord,
1537    {
1538        self.entries()
1539            .assume_ordering_trusted(nondet!(
1540                /// There is only one element associated with each key, and the keys are totallly
1541                /// ordered so we will produce a deterministic value. The closure technically
1542                /// isn't commutative in the case where both passed entries have the same key
1543                /// but different values.
1544                ///
1545                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1546                /// the two inputs do not have the same key.
1547            ))
1548            .reduce(q!(
1549                move |curr, new| {
1550                    if new.0 > curr.0 {
1551                        *curr = new;
1552                    }
1553                },
1554                idempotent = manual_proof!(/** repeated elements are ignored */)
1555            ))
1556    }
1557
1558    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1559    /// element, the value.
1560    ///
1561    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1562    ///
1563    /// # Example
1564    /// ```rust
1565    /// # #[cfg(feature = "deploy")] {
1566    /// # use hydro_lang::prelude::*;
1567    /// # use futures::StreamExt;
1568    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1569    /// let keyed_singleton = // { 1: 2, 2: 4 }
1570    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1571    /// #     .into_keyed()
1572    /// #     .first();
1573    /// keyed_singleton
1574    ///     .clone()
1575    ///     .into_keyed_stream()
1576    ///     .merge_unordered(
1577    ///         keyed_singleton.into_keyed_stream()
1578    ///     )
1579    /// #   .entries()
1580    /// # }, |mut stream| async move {
1581    /// /// // { 1: [2, 2], 2: [4, 4] }
1582    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1583    /// #     assert_eq!(stream.next().await.unwrap(), w);
1584    /// # }
1585    /// # }));
1586    /// # }
1587    /// ```
1588    pub fn into_keyed_stream(
1589        self,
1590    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1591        KeyedStream::new(
1592            self.location.clone(),
1593            HydroNode::Cast {
1594                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1595                metadata: self.location.new_node_metadata(KeyedStream::<
1596                    K,
1597                    V,
1598                    L,
1599                    B::UnderlyingBound,
1600                    TotalOrder,
1601                    ExactlyOnce,
1602                >::collection_kind()),
1603            },
1604        )
1605    }
1606}
1607
1608impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1609where
1610    L: Location<'a>,
1611{
1612    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1613    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1614    ///
1615    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1616    /// processed before an acknowledgement is emitted.
1617    pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1618        let id = self.location.flow_state().borrow_mut().next_clock_id();
1619        let out_location = Atomic {
1620            tick: Tick {
1621                id,
1622                l: self.location.clone(),
1623            },
1624        };
1625        KeyedSingleton::new(
1626            out_location.clone(),
1627            HydroNode::BeginAtomic {
1628                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1629                metadata: out_location
1630                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1631            },
1632        )
1633    }
1634}
1635
1636impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1637where
1638    L: Location<'a>,
1639{
1640    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1641    /// See [`KeyedSingleton::atomic`] for more details.
1642    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1643        KeyedSingleton::new(
1644            self.location.tick.l.clone(),
1645            HydroNode::EndAtomic {
1646                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1647                metadata: self
1648                    .location
1649                    .tick
1650                    .l
1651                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1652            },
1653        )
1654    }
1655}
1656
1657impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1658    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1659    /// tick `T` always has the entries of `self` at tick `T - 1`.
1660    ///
1661    /// At tick `0`, the output has no entries, since there is no previous tick.
1662    ///
1663    /// This operator enables stateful iterative processing with ticks, by sending data from one
1664    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1665    ///
1666    /// # Example
1667    /// ```rust
1668    /// # #[cfg(feature = "deploy")] {
1669    /// # use hydro_lang::prelude::*;
1670    /// # use futures::StreamExt;
1671    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1672    /// let tick = process.tick();
1673    /// # // ticks are lazy by default, forces the second tick to run
1674    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1675    /// # let batch_first_tick = process
1676    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1677    /// #   .batch(&tick, nondet!(/** test */))
1678    /// #   .into_keyed();
1679    /// # let batch_second_tick = process
1680    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1681    /// #   .batch(&tick, nondet!(/** test */))
1682    /// #   .into_keyed()
1683    /// #   .defer_tick(); // appears on the second tick
1684    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1685    /// # batch_first_tick.chain(batch_second_tick).first();
1686    /// input_batch.clone().filter_key_not_in(
1687    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1688    /// )
1689    /// # .entries().all_ticks()
1690    /// # }, |mut stream| async move {
1691    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1692    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1693    /// #     assert_eq!(stream.next().await.unwrap(), w);
1694    /// # }
1695    /// # }));
1696    /// # }
1697    /// ```
1698    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1699        KeyedSingleton::new(
1700            self.location.clone(),
1701            HydroNode::DeferTick {
1702                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1703                metadata: self
1704                    .location
1705                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1706            },
1707        )
1708    }
1709}
1710
1711impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1712where
1713    L: Location<'a>,
1714{
1715    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1716    /// point in time.
1717    ///
1718    /// # Non-Determinism
1719    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1720    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1721    pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1722        self,
1723        tick: &Tick<L2>,
1724        _nondet: NonDet,
1725    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1726        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1727        KeyedSingleton::new(
1728            tick.drop_consistency(),
1729            HydroNode::Batch {
1730                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1731                metadata: tick
1732                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1733            },
1734        )
1735    }
1736}
1737
1738impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1739where
1740    L: Location<'a>,
1741{
1742    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1743    /// state of the keyed singleton being atomically processed.
1744    ///
1745    /// # Non-Determinism
1746    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1747    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1748    pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1749        self,
1750        tick: &Tick<L2>,
1751        _nondet: NonDet,
1752    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1753        KeyedSingleton::new(
1754            tick.drop_consistency(),
1755            HydroNode::Batch {
1756                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1757                metadata: tick
1758                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1759            },
1760        )
1761    }
1762}
1763
1764impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1765where
1766    L: Location<'a>,
1767{
1768    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1769    ///
1770    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1771    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1772    /// is filtered out.
1773    ///
1774    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1775    /// not modify or take ownership of the values. If you need to modify the values while filtering
1776    /// use [`KeyedSingleton::filter_map`] instead.
1777    ///
1778    /// # Example
1779    /// ```rust
1780    /// # #[cfg(feature = "deploy")] {
1781    /// # use hydro_lang::prelude::*;
1782    /// # use futures::StreamExt;
1783    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1784    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1785    /// # process
1786    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1787    /// #     .into_keyed()
1788    /// #     .first();
1789    /// keyed_singleton.filter(q!(|&v| v > 1))
1790    /// #   .entries()
1791    /// # }, |mut stream| async move {
1792    /// // { 1: 2, 2: 4 }
1793    /// # let mut results = Vec::new();
1794    /// # for _ in 0..2 {
1795    /// #     results.push(stream.next().await.unwrap());
1796    /// # }
1797    /// # results.sort();
1798    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1799    /// # }));
1800    /// # }
1801    /// ```
1802    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1803    where
1804        F: Fn(&V) -> bool + 'a,
1805    {
1806        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1807        let filter_f = q!({
1808            let orig = f;
1809            move |t: &(_, _)| orig(&t.1)
1810        })
1811        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1812        .into();
1813
1814        KeyedSingleton::new(
1815            self.location.clone(),
1816            HydroNode::Filter {
1817                f: filter_f,
1818                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1819                metadata: self
1820                    .location
1821                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1822            },
1823        )
1824    }
1825
1826    /// An operator that both filters and maps values. It yields only the key-value pairs where
1827    /// the supplied closure `f` returns `Some(value)`.
1828    ///
1829    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1830    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1831    /// If it returns `None`, the key-value pair is filtered out.
1832    ///
1833    /// # Example
1834    /// ```rust
1835    /// # #[cfg(feature = "deploy")] {
1836    /// # use hydro_lang::prelude::*;
1837    /// # use futures::StreamExt;
1838    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1839    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1840    /// # process
1841    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1842    /// #     .into_keyed()
1843    /// #     .first();
1844    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1845    /// #   .entries()
1846    /// # }, |mut stream| async move {
1847    /// // { 1: 42, 3: 100 }
1848    /// # let mut results = Vec::new();
1849    /// # for _ in 0..2 {
1850    /// #     results.push(stream.next().await.unwrap());
1851    /// # }
1852    /// # results.sort();
1853    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1854    /// # }));
1855    /// # }
1856    /// ```
1857    pub fn filter_map<F, U>(
1858        self,
1859        f: impl IntoQuotedMut<'a, F, L> + Copy,
1860    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1861    where
1862        F: Fn(V) -> Option<U> + 'a,
1863    {
1864        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1865        let filter_map_f = q!({
1866            let orig = f;
1867            move |(k, v)| orig(v).map(|o| (k, o))
1868        })
1869        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1870        .into();
1871
1872        KeyedSingleton::new(
1873            self.location.clone(),
1874            HydroNode::FilterMap {
1875                f: filter_map_f,
1876                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1877                metadata: self.location.new_node_metadata(KeyedSingleton::<
1878                    K,
1879                    U,
1880                    L,
1881                    B::EraseMonotonic,
1882                >::collection_kind()),
1883            },
1884        )
1885    }
1886
1887    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1888    /// arrived since the previous batch was released.
1889    ///
1890    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1891    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1892    ///
1893    /// # Non-Determinism
1894    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1895    /// has a non-deterministic set of key-value pairs.
1896    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1897        self,
1898        tick: &Tick<L2>,
1899        _nondet: NonDet,
1900    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1901        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1902        KeyedSingleton::new(
1903            tick.drop_consistency(),
1904            HydroNode::Batch {
1905                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1906                metadata: tick
1907                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1908            },
1909        )
1910    }
1911}
1912
1913impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1914where
1915    L: Location<'a>,
1916{
1917    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1918    /// atomically processed.
1919    ///
1920    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1921    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1922    ///
1923    /// # Non-Determinism
1924    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1925    /// has a non-deterministic set of key-value pairs.
1926    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1927        self,
1928        tick: &Tick<L2>,
1929        nondet: NonDet,
1930    ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1931        let _ = nondet;
1932        KeyedSingleton::new(
1933            tick.drop_consistency(),
1934            HydroNode::Batch {
1935                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1936                metadata: tick
1937                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1938            },
1939        )
1940    }
1941}
1942
1943#[cfg(test)]
1944mod tests {
1945    #[cfg(feature = "deploy")]
1946    use futures::{SinkExt, StreamExt};
1947    #[cfg(feature = "deploy")]
1948    use hydro_deploy::Deployment;
1949    #[cfg(any(feature = "deploy", feature = "sim"))]
1950    use stageleft::q;
1951
1952    #[cfg(any(feature = "deploy", feature = "sim"))]
1953    use crate::compile::builder::FlowBuilder;
1954    #[cfg(any(feature = "deploy", feature = "sim"))]
1955    use crate::location::Location;
1956    #[cfg(any(feature = "deploy", feature = "sim"))]
1957    use crate::nondet::nondet;
1958
1959    #[cfg(feature = "deploy")]
1960    #[tokio::test]
1961    async fn key_count_bounded_value() {
1962        let mut deployment = Deployment::new();
1963
1964        let mut flow = FlowBuilder::new();
1965        let node = flow.process::<()>();
1966        let external = flow.external::<()>();
1967
1968        let (input_port, input) = node.source_external_bincode(&external);
1969        let out = input
1970            .into_keyed()
1971            .first()
1972            .key_count()
1973            .sample_eager(nondet!(/** test */))
1974            .send_bincode_external(&external);
1975
1976        let nodes = flow
1977            .with_process(&node, deployment.Localhost())
1978            .with_external(&external, deployment.Localhost())
1979            .deploy(&mut deployment);
1980
1981        deployment.deploy().await.unwrap();
1982
1983        let mut external_in = nodes.connect(input_port).await;
1984        let mut external_out = nodes.connect(out).await;
1985
1986        deployment.start().await.unwrap();
1987
1988        assert_eq!(external_out.next().await.unwrap(), 0);
1989
1990        external_in.send((1, 1)).await.unwrap();
1991        assert_eq!(external_out.next().await.unwrap(), 1);
1992
1993        external_in.send((2, 2)).await.unwrap();
1994        assert_eq!(external_out.next().await.unwrap(), 2);
1995    }
1996
1997    #[cfg(feature = "deploy")]
1998    #[tokio::test]
1999    async fn key_count_unbounded_value() {
2000        let mut deployment = Deployment::new();
2001
2002        let mut flow = FlowBuilder::new();
2003        let node = flow.process::<()>();
2004        let external = flow.external::<()>();
2005
2006        let (input_port, input) = node.source_external_bincode(&external);
2007        let out = input
2008            .into_keyed()
2009            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2010            .key_count()
2011            .sample_eager(nondet!(/** test */))
2012            .send_bincode_external(&external);
2013
2014        let nodes = flow
2015            .with_process(&node, deployment.Localhost())
2016            .with_external(&external, deployment.Localhost())
2017            .deploy(&mut deployment);
2018
2019        deployment.deploy().await.unwrap();
2020
2021        let mut external_in = nodes.connect(input_port).await;
2022        let mut external_out = nodes.connect(out).await;
2023
2024        deployment.start().await.unwrap();
2025
2026        assert_eq!(external_out.next().await.unwrap(), 0);
2027
2028        external_in.send((1, 1)).await.unwrap();
2029        assert_eq!(external_out.next().await.unwrap(), 1);
2030
2031        external_in.send((1, 2)).await.unwrap();
2032        assert_eq!(external_out.next().await.unwrap(), 1);
2033
2034        external_in.send((2, 2)).await.unwrap();
2035        assert_eq!(external_out.next().await.unwrap(), 2);
2036
2037        external_in.send((1, 1)).await.unwrap();
2038        assert_eq!(external_out.next().await.unwrap(), 2);
2039
2040        external_in.send((3, 1)).await.unwrap();
2041        assert_eq!(external_out.next().await.unwrap(), 3);
2042    }
2043
2044    #[cfg(feature = "deploy")]
2045    #[tokio::test]
2046    async fn into_singleton_bounded_value() {
2047        let mut deployment = Deployment::new();
2048
2049        let mut flow = FlowBuilder::new();
2050        let node = flow.process::<()>();
2051        let external = flow.external::<()>();
2052
2053        let (input_port, input) = node.source_external_bincode(&external);
2054        let out = input
2055            .into_keyed()
2056            .first()
2057            .into_singleton()
2058            .sample_eager(nondet!(/** test */))
2059            .send_bincode_external(&external);
2060
2061        let nodes = flow
2062            .with_process(&node, deployment.Localhost())
2063            .with_external(&external, deployment.Localhost())
2064            .deploy(&mut deployment);
2065
2066        deployment.deploy().await.unwrap();
2067
2068        let mut external_in = nodes.connect(input_port).await;
2069        let mut external_out = nodes.connect(out).await;
2070
2071        deployment.start().await.unwrap();
2072
2073        assert_eq!(
2074            external_out.next().await.unwrap(),
2075            std::collections::HashMap::new()
2076        );
2077
2078        external_in.send((1, 1)).await.unwrap();
2079        assert_eq!(
2080            external_out.next().await.unwrap(),
2081            vec![(1, 1)].into_iter().collect()
2082        );
2083
2084        external_in.send((2, 2)).await.unwrap();
2085        assert_eq!(
2086            external_out.next().await.unwrap(),
2087            vec![(1, 1), (2, 2)].into_iter().collect()
2088        );
2089    }
2090
2091    #[cfg(feature = "deploy")]
2092    #[tokio::test]
2093    async fn into_singleton_unbounded_value() {
2094        let mut deployment = Deployment::new();
2095
2096        let mut flow = FlowBuilder::new();
2097        let node = flow.process::<()>();
2098        let external = flow.external::<()>();
2099
2100        let (input_port, input) = node.source_external_bincode(&external);
2101        let out = input
2102            .into_keyed()
2103            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2104            .into_singleton()
2105            .sample_eager(nondet!(/** test */))
2106            .send_bincode_external(&external);
2107
2108        let nodes = flow
2109            .with_process(&node, deployment.Localhost())
2110            .with_external(&external, deployment.Localhost())
2111            .deploy(&mut deployment);
2112
2113        deployment.deploy().await.unwrap();
2114
2115        let mut external_in = nodes.connect(input_port).await;
2116        let mut external_out = nodes.connect(out).await;
2117
2118        deployment.start().await.unwrap();
2119
2120        assert_eq!(
2121            external_out.next().await.unwrap(),
2122            std::collections::HashMap::new()
2123        );
2124
2125        external_in.send((1, 1)).await.unwrap();
2126        assert_eq!(
2127            external_out.next().await.unwrap(),
2128            vec![(1, 1)].into_iter().collect()
2129        );
2130
2131        external_in.send((1, 2)).await.unwrap();
2132        assert_eq!(
2133            external_out.next().await.unwrap(),
2134            vec![(1, 2)].into_iter().collect()
2135        );
2136
2137        external_in.send((2, 2)).await.unwrap();
2138        assert_eq!(
2139            external_out.next().await.unwrap(),
2140            vec![(1, 2), (2, 1)].into_iter().collect()
2141        );
2142
2143        external_in.send((1, 1)).await.unwrap();
2144        assert_eq!(
2145            external_out.next().await.unwrap(),
2146            vec![(1, 3), (2, 1)].into_iter().collect()
2147        );
2148
2149        external_in.send((3, 1)).await.unwrap();
2150        assert_eq!(
2151            external_out.next().await.unwrap(),
2152            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
2153        );
2154    }
2155
2156    #[cfg(feature = "sim")]
2157    #[test]
2158    fn sim_unbounded_singleton_snapshot() {
2159        let mut flow = FlowBuilder::new();
2160        let node = flow.process::<()>();
2161
2162        let (input_port, input) = node.sim_input();
2163        let output = input
2164            .into_keyed()
2165            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2166            .snapshot(&node.tick(), nondet!(/** test */))
2167            .entries()
2168            .all_ticks()
2169            .sim_output();
2170
2171        let count = flow.sim().exhaustive(async || {
2172            input_port.send((1, 123));
2173            input_port.send((1, 456));
2174            input_port.send((2, 123));
2175
2176            let all = output.collect_sorted::<Vec<_>>().await;
2177            assert_eq!(all.last().unwrap(), &(2, 1));
2178        });
2179
2180        assert_eq!(count, 8);
2181    }
2182
2183    #[cfg(feature = "deploy")]
2184    #[tokio::test]
2185    async fn join_keyed_stream() {
2186        let mut deployment = Deployment::new();
2187
2188        let mut flow = FlowBuilder::new();
2189        let node = flow.process::<()>();
2190        let external = flow.external::<()>();
2191
2192        let tick = node.tick();
2193        let keyed_data = node
2194            .source_iter(q!(vec![(1, 10), (2, 20)]))
2195            .into_keyed()
2196            .batch(&tick, nondet!(/** test */))
2197            .first();
2198        let requests = node
2199            .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
2200            .into_keyed()
2201            .batch(&tick, nondet!(/** test */));
2202
2203        let out = keyed_data
2204            .join_keyed_stream(requests)
2205            .entries()
2206            .all_ticks()
2207            .send_bincode_external(&external);
2208
2209        let nodes = flow
2210            .with_process(&node, deployment.Localhost())
2211            .with_external(&external, deployment.Localhost())
2212            .deploy(&mut deployment);
2213
2214        deployment.deploy().await.unwrap();
2215
2216        let mut external_out = nodes.connect(out).await;
2217
2218        deployment.start().await.unwrap();
2219
2220        let mut results = vec![];
2221        for _ in 0..2 {
2222            results.push(external_out.next().await.unwrap());
2223        }
2224        results.sort();
2225
2226        assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
2227    }
2228
2229    #[cfg(feature = "sim")]
2230    #[test]
2231    fn threshold_greater_or_equal_monotonic() {
2232        let mut flow = FlowBuilder::new();
2233        let node = flow.process::<()>();
2234
2235        let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2236        let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2237
2238        // Create a monotonically increasing keyed singleton via fold with monotone proof
2239        let counts: super::KeyedSingleton<u32, usize, _, super::MonotonicValue> =
2240            input.into_keyed().fold(
2241                q!(|| 0usize),
2242                q!(
2243                    |acc, v| *acc += v,
2244                    monotone = crate::properties::manual_proof!(/** += is monotonic */)
2245                ),
2246            );
2247
2248        // BoundedValue keyed singleton of thresholds (from .first() on unbounded stream)
2249        let thresholds = thresh_input.into_keyed().first();
2250
2251        let output = counts
2252            .threshold_greater_or_equal(thresholds)
2253            .entries()
2254            .sim_output();
2255
2256        let count = flow.sim().exhaustive(async || {
2257            // Set thresholds: key 1 needs value >= 5, key 2 needs value >= 10
2258            thresh_port.send((1, 5));
2259            thresh_port.send((2, 10));
2260
2261            // key 1 gets increments: 3 + 3 = 6, which is >= 5 ✓
2262            input_port.send((1, 3));
2263            input_port.send((1, 3));
2264            // key 2 gets increments: 3 + 3 = 6, which is < 10 ✗
2265            input_port.send((2, 3));
2266            input_port.send((2, 3));
2267
2268            let results = output.collect_sorted::<Vec<_>>().await;
2269            assert_eq!(results, vec![(1, 5)]);
2270        });
2271
2272        assert!(count > 0);
2273    }
2274
2275    #[cfg(feature = "sim")]
2276    #[test]
2277    fn threshold_greater_or_equal_uniform() {
2278        let mut flow = FlowBuilder::new();
2279        let node = flow.process::<()>();
2280
2281        let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2282
2283        let counts: super::KeyedSingleton<u32, usize, _, super::MonotonicValue> =
2284            input.into_keyed().fold(
2285                q!(|| 0usize),
2286                q!(
2287                    |acc, v| *acc += v,
2288                    monotone = crate::properties::manual_proof!(/** += is monotonic */)
2289                ),
2290            );
2291
2292        // Uniform threshold: all keys need value >= 5
2293        let threshold = node.singleton(q!(5usize));
2294
2295        let output = counts
2296            .threshold_greater_or_equal_uniform(threshold)
2297            .entries()
2298            .sim_output();
2299
2300        let count = flow.sim().exhaustive(async || {
2301            // key 1: 3 + 3 = 6 >= 5 ✓
2302            input_port.send((1, 3));
2303            input_port.send((1, 3));
2304            // key 2: 2 + 2 = 4 < 5 ✗
2305            input_port.send((2, 2));
2306            input_port.send((2, 2));
2307
2308            let results = output.collect_sorted::<Vec<_>>().await;
2309            assert_eq!(results, vec![(1, 5)]);
2310        });
2311
2312        assert!(count > 0);
2313    }
2314
2315    #[cfg(feature = "sim")]
2316    #[test]
2317    fn threshold_greater_or_equal_bounded_value() {
2318        let mut flow = FlowBuilder::new();
2319        let node = flow.process::<()>();
2320
2321        let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2322        let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2323
2324        // BoundedValue keyed singleton (values fixed once per key via .first())
2325        let values = input.into_keyed().first();
2326
2327        // BoundedValue keyed singleton of thresholds
2328        let thresholds = thresh_input.into_keyed().first();
2329
2330        let output = values
2331            .threshold_greater_or_equal(thresholds)
2332            .entries()
2333            .sim_output();
2334
2335        let count = flow.sim().exhaustive(async || {
2336            // Set thresholds: key 1 needs >= 3, key 2 needs >= 10
2337            thresh_port.send((1, 3));
2338            thresh_port.send((2, 10));
2339
2340            // key 1 gets value 5 >= 3 ✓, key 2 gets value 4 < 10 ✗
2341            input_port.send((1, 5));
2342            input_port.send((2, 4));
2343
2344            let results = output.collect_sorted::<Vec<_>>().await;
2345            assert_eq!(results, vec![(1, 3)]);
2346        });
2347
2348        assert!(count > 0);
2349    }
2350
2351    #[cfg(feature = "sim")]
2352    #[test]
2353    fn threshold_greater_or_equal_uniform_bounded_value() {
2354        let mut flow = FlowBuilder::new();
2355        let node = flow.process::<()>();
2356
2357        let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2358
2359        // BoundedValue keyed singleton (values fixed once per key via .first())
2360        let values = input.into_keyed().first();
2361
2362        // Uniform threshold: all keys need value >= 5
2363        let threshold = node.singleton(q!(5usize));
2364
2365        let output = values
2366            .threshold_greater_or_equal_uniform(threshold)
2367            .entries()
2368            .sim_output();
2369
2370        let count = flow.sim().exhaustive(async || {
2371            // key 1 gets value 7 >= 5 ✓, key 2 gets value 3 < 5 ✗
2372            input_port.send((1, 7));
2373            input_port.send((2, 3));
2374
2375            let results = output.collect_sorted::<Vec<_>>().await;
2376            assert_eq!(results, vec![(1, 5)]);
2377        });
2378
2379        assert!(count > 0);
2380    }
2381
2382    #[cfg(feature = "sim")]
2383    #[test]
2384    fn threshold_greater_or_equal_bounded() {
2385        let mut flow = FlowBuilder::new();
2386        let node = flow.process::<()>();
2387
2388        // Bounded keyed singleton (fully known upfront)
2389        let values = node
2390            .source_iter(q!(vec![(1, 6usize), (2, 4usize)]))
2391            .into_keyed()
2392            .first();
2393
2394        // BoundedValue thresholds (from async source)
2395        let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2396        let thresholds = thresh_input.into_keyed().first();
2397
2398        let output = values
2399            .threshold_greater_or_equal(thresholds)
2400            .entries()
2401            .sim_output();
2402
2403        let count = flow.sim().exhaustive(async || {
2404            thresh_port.send((1, 5));
2405            thresh_port.send((2, 10));
2406
2407            // key 1: 6 >= 5 ✓, key 2: 4 < 10 ✗
2408            let results = output.collect_sorted::<Vec<_>>().await;
2409            assert_eq!(results, vec![(1, 5)]);
2410        });
2411
2412        assert!(count > 0);
2413    }
2414
2415    #[cfg(feature = "sim")]
2416    #[test]
2417    fn threshold_greater_or_equal_uniform_bounded() {
2418        let mut flow = FlowBuilder::new();
2419        let node = flow.process::<()>();
2420
2421        let values = node
2422            .source_iter(q!(vec![(1, 6usize), (2, 4usize)]))
2423            .into_keyed()
2424            .first();
2425        let threshold = node.singleton(q!(5usize));
2426
2427        let output = values
2428            .threshold_greater_or_equal_uniform(threshold)
2429            .entries()
2430            .sim_output();
2431
2432        let count = flow.sim().exhaustive(async || {
2433            // key 1: 6 >= 5 ✓, key 2: 4 < 5 ✗
2434            let results = output.collect_sorted::<Vec<_>>().await;
2435            assert_eq!(results, vec![(1, 5)]);
2436        });
2437
2438        assert!(count > 0);
2439    }
2440}