Skip to main content

hydro_lang/location/
process.rs

1//! Definition of the [`Process`] location type, representing a single-node
2//! compute location in a distributed Hydro program.
3//!
4//! A [`Process`] is the simplest kind of location: it corresponds to exactly one
5//! machine (or OS process) and all live collections placed on it are materialized
6//! on that single node. Use a process when the computation does not need to be
7//! replicated or partitioned across multiple nodes.
8//!
9//! Processes are created via [`FlowBuilder::process`](crate::compile::builder::FlowBuilder::process)
10//! and are parameterized by a **tag type** (`ProcessTag`) that lets the type
11//! system distinguish different processes at compile time.
12
13use std::fmt::{Debug, Formatter};
14use std::marker::PhantomData;
15
16use super::{Location, LocationId};
17use crate::compile::builder::FlowState;
18use crate::location::{LocationKey, TopLevel};
19use crate::staging_util::Invariant;
20
21/// A single-node location in a distributed Hydro program.
22///
23/// `Process` represents exactly one machine (or OS process) and is one of the
24/// core location types that implements the [`Location`] trait. Live collections
25/// placed on a `Process` are materialized entirely on that single node.
26///
27/// The type parameter `ProcessTag` is a compile-time marker that differentiates
28/// distinct processes in the same dataflow graph (e.g. `Process<'a, Leader>` vs
29/// `Process<'a, Follower>`). It defaults to `()` when only one process is
30/// needed.
31///
32/// # Creating a Process
33/// ```rust
34/// # use hydro_lang::prelude::*;
35/// struct MyTag;
36/// let mut flow = FlowBuilder::new();
37/// let node = flow.process::<MyTag>();
38/// # let _ = &node;
39/// # let _ = flow.finalize();
40/// ```
41pub struct Process<'a, ProcessTag = ()> {
42    pub(crate) key: LocationKey,
43    pub(crate) flow_state: FlowState,
44    pub(crate) _phantom: Invariant<'a, ProcessTag>,
45}
46
47impl<P> Debug for Process<'_, P> {
48    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49        write!(f, "Process({})", self.key)
50    }
51}
52
53impl<P> Eq for Process<'_, P> {}
54impl<P> PartialEq for Process<'_, P> {
55    fn eq(&self, other: &Self) -> bool {
56        self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
57    }
58}
59
60impl<P> Clone for Process<'_, P> {
61    fn clone(&self) -> Self {
62        Process {
63            key: self.key,
64            flow_state: self.flow_state.clone(),
65            _phantom: PhantomData,
66        }
67    }
68}
69
70impl<'a, P> super::dynamic::DynLocation for Process<'a, P> {
71    fn dyn_id(&self) -> LocationId {
72        LocationId::Process(self.key)
73    }
74
75    fn flow_state(&self) -> &FlowState {
76        &self.flow_state
77    }
78
79    fn is_top_level() -> bool {
80        true
81    }
82
83    fn multiversioned(&self) -> bool {
84        false // processes are always single-versioned
85    }
86
87    fn cluster_consistency() -> Option<super::dynamic::ClusterConsistency> {
88        None
89    }
90}
91
92impl<'a, P> Location<'a> for Process<'a, P> {
93    type Root = Self;
94
95    type DropConsistency = Self;
96
97    fn consistency() -> Option<super::dynamic::ClusterConsistency> {
98        None
99    }
100
101    fn root(&self) -> Self::Root {
102        self.clone()
103    }
104
105    fn drop_consistency(&self) -> Self::DropConsistency {
106        self.clone()
107    }
108
109    fn from_drop_consistency(l2: Self::DropConsistency) -> Self {
110        l2
111    }
112}
113
114impl<'a, P> TopLevel<'a> for Process<'a, P> {}