hydro_lang/graph/
render.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Write;
4
5use auto_impl::auto_impl;
6
7pub use super::graphviz::{HydroDot, escape_dot};
8// Re-export specific implementations
9pub use super::mermaid::{HydroMermaid, escape_mermaid};
10pub use super::reactflow::HydroReactFlow;
11use crate::compile::ir::{DebugExpr, HydroNode, HydroRoot, HydroSource};
12
13/// Label for a graph node - can be either a static string or contain expressions.
14#[derive(Debug, Clone)]
15pub enum NodeLabel {
16    /// A static string label
17    Static(String),
18    /// A label with an operation name and expression arguments
19    WithExprs {
20        op_name: String,
21        exprs: Vec<DebugExpr>,
22    },
23}
24
25impl NodeLabel {
26    /// Create a static label
27    pub fn static_label(s: String) -> Self {
28        Self::Static(s)
29    }
30
31    /// Create a label for an operation with multiple expression
32    pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
33        Self::WithExprs { op_name, exprs }
34    }
35}
36
37impl std::fmt::Display for NodeLabel {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Self::Static(s) => write!(f, "{}", s),
41            Self::WithExprs { op_name, exprs } => {
42                if exprs.is_empty() {
43                    write!(f, "{}()", op_name)
44                } else {
45                    let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
46                    write!(f, "{}({})", op_name, expr_strs.join(", "))
47                }
48            }
49        }
50    }
51}
52
53/// Base struct for text-based graph writers that use indentation.
54/// Contains common fields shared by DOT and Mermaid writers.
55pub struct IndentedGraphWriter<W> {
56    pub write: W,
57    pub indent: usize,
58    pub config: HydroWriteConfig,
59}
60
61impl<W> IndentedGraphWriter<W> {
62    /// Create a new writer with default configuration.
63    pub fn new(write: W) -> Self {
64        Self {
65            write,
66            indent: 0,
67            config: HydroWriteConfig::default(),
68        }
69    }
70
71    /// Create a new writer with the given configuration.
72    pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
73        Self {
74            write,
75            indent: 0,
76            config: config.clone(),
77        }
78    }
79}
80
81impl<W: Write> IndentedGraphWriter<W> {
82    /// Write an indented line using the current indentation level.
83    pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
84        writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
85    }
86}
87
88/// Common error type used by all graph writers.
89pub type GraphWriteError = std::fmt::Error;
90
91/// Trait for writing textual representations of Hydro IR graphs, i.e. mermaid or dot graphs.
92#[auto_impl(&mut, Box)]
93pub trait HydroGraphWrite {
94    /// Error type emitted by writing.
95    type Err: Error;
96
97    /// Begin the graph. First method called.
98    fn write_prologue(&mut self) -> Result<(), Self::Err>;
99
100    /// Write a node definition with styling.
101    fn write_node_definition(
102        &mut self,
103        node_id: usize,
104        node_label: &NodeLabel,
105        node_type: HydroNodeType,
106        location_id: Option<usize>,
107        location_type: Option<&str>,
108    ) -> Result<(), Self::Err>;
109
110    /// Write an edge between nodes with optional labeling.
111    fn write_edge(
112        &mut self,
113        src_id: usize,
114        dst_id: usize,
115        edge_type: HydroEdgeType,
116        label: Option<&str>,
117    ) -> Result<(), Self::Err>;
118
119    /// Begin writing a location grouping (process/cluster).
120    fn write_location_start(
121        &mut self,
122        location_id: usize,
123        location_type: &str,
124    ) -> Result<(), Self::Err>;
125
126    /// Write a node within a location.
127    fn write_node(&mut self, node_id: usize) -> Result<(), Self::Err>;
128
129    /// End writing a location grouping.
130    fn write_location_end(&mut self) -> Result<(), Self::Err>;
131
132    /// End the graph. Last method called.
133    fn write_epilogue(&mut self) -> Result<(), Self::Err>;
134}
135
136/// Types of nodes in Hydro IR for styling purposes.
137#[derive(Debug, Clone, Copy)]
138pub enum HydroNodeType {
139    Source,
140    Transform,
141    Join,
142    Aggregation,
143    Network,
144    Sink,
145    Tee,
146}
147
148/// Types of edges in Hydro IR.
149#[derive(Debug, Clone, Copy)]
150pub enum HydroEdgeType {
151    Stream,
152    Persistent,
153    Network,
154    Cycle,
155}
156
157/// Configuration for graph writing.
158#[derive(Debug, Clone)]
159pub struct HydroWriteConfig {
160    pub show_metadata: bool,
161    pub show_location_groups: bool,
162    pub use_short_labels: bool,
163    pub process_id_name: Vec<(usize, String)>,
164    pub cluster_id_name: Vec<(usize, String)>,
165    pub external_id_name: Vec<(usize, String)>,
166}
167
168impl Default for HydroWriteConfig {
169    fn default() -> Self {
170        Self {
171            show_metadata: false,
172            show_location_groups: true,
173            use_short_labels: true, // Default to short labels for all renderers
174            process_id_name: vec![],
175            cluster_id_name: vec![],
176            external_id_name: vec![],
177        }
178    }
179}
180
181/// Graph structure tracker for Hydro IR rendering.
182#[derive(Debug, Default)]
183pub struct HydroGraphStructure {
184    pub nodes: HashMap<usize, (NodeLabel, HydroNodeType, Option<usize>)>, /* node_id -> (label, type, location) */
185    pub edges: Vec<(usize, usize, HydroEdgeType, Option<String>)>, // (src, dst, edge_type, label)
186    pub locations: HashMap<usize, String>,                         // location_id -> location_type
187    pub next_node_id: usize,
188}
189
190impl HydroGraphStructure {
191    pub fn new() -> Self {
192        Self::default()
193    }
194
195    pub fn add_node(
196        &mut self,
197        label: NodeLabel,
198        node_type: HydroNodeType,
199        location: Option<usize>,
200    ) -> usize {
201        let node_id = self.next_node_id;
202        self.next_node_id += 1;
203        self.nodes.insert(node_id, (label, node_type, location));
204        node_id
205    }
206
207    pub fn add_edge(
208        &mut self,
209        src: usize,
210        dst: usize,
211        edge_type: HydroEdgeType,
212        label: Option<String>,
213    ) {
214        self.edges.push((src, dst, edge_type, label));
215    }
216
217    pub fn add_location(&mut self, location_id: usize, location_type: String) {
218        self.locations.insert(location_id, location_type);
219    }
220}
221
222/// Function to extract an op_name from a print_root() result for use in labels.
223pub fn extract_op_name(full_label: String) -> String {
224    full_label
225        .split('(')
226        .next()
227        .unwrap_or("unknown")
228        .to_string()
229        .to_lowercase()
230}
231
232/// Extract a short, readable label from the full token stream label using print_root() style naming
233pub fn extract_short_label(full_label: &str) -> String {
234    // Use the same logic as extract_op_name but handle the specific cases we need for UI display
235    if let Some(op_name) = full_label.split('(').next() {
236        let base_name = op_name.to_lowercase();
237        match base_name.as_str() {
238            // Handle special cases for UI display
239            "source" => {
240                if full_label.contains("Iter") {
241                    "source_iter".to_string()
242                } else if full_label.contains("Stream") {
243                    "source_stream".to_string()
244                } else if full_label.contains("ExternalNetwork") {
245                    "external_network".to_string()
246                } else if full_label.contains("Spin") {
247                    "spin".to_string()
248                } else {
249                    "source".to_string()
250                }
251            }
252            "network" => {
253                if full_label.contains("deser") {
254                    "network(recv)".to_string()
255                } else if full_label.contains("ser") {
256                    "network(send)".to_string()
257                } else {
258                    "network".to_string()
259                }
260            }
261            // For all other cases, just use the lowercase base name (same as extract_op_name)
262            _ => base_name,
263        }
264    } else {
265        // Fallback for labels that don't follow the pattern
266        if full_label.len() > 20 {
267            format!("{}...", &full_label[..17])
268        } else {
269            full_label.to_string()
270        }
271    }
272}
273
274/// Helper function to extract location ID and type from metadata.
275fn extract_location_id(
276    metadata: &crate::compile::ir::HydroIrMetadata,
277) -> (Option<usize>, Option<String>) {
278    use crate::location::dynamic::LocationId;
279    match &metadata.location_kind {
280        LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
281        LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
282        LocationId::Tick(_, inner) => match inner.as_ref() {
283            LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
284            LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
285            _ => (None, None),
286        },
287    }
288}
289
290/// Helper function to set up location in structure from metadata.
291fn setup_location(
292    structure: &mut HydroGraphStructure,
293    metadata: &crate::compile::ir::HydroIrMetadata,
294) -> Option<usize> {
295    let (location_id, location_type) = extract_location_id(metadata);
296    if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
297        structure.add_location(loc_id, loc_type);
298    }
299    location_id
300}
301
302impl HydroRoot {
303    /// Core graph writing logic that works with any GraphWrite implementation.
304    pub fn write_graph<W>(
305        &self,
306        mut graph_write: W,
307        config: &HydroWriteConfig,
308    ) -> Result<(), W::Err>
309    where
310        W: HydroGraphWrite,
311    {
312        let mut structure = HydroGraphStructure::new();
313        let mut seen_tees = HashMap::new();
314
315        // Build the graph structure by traversing the IR
316        let _sink_id = self.build_graph_structure(&mut structure, &mut seen_tees, config);
317
318        // Write the graph
319        graph_write.write_prologue()?;
320
321        // Write node definitions
322        for (&node_id, (label, node_type, location)) in &structure.nodes {
323            let (location_id, location_type) = if let Some(loc_id) = location {
324                (
325                    Some(*loc_id),
326                    structure.locations.get(loc_id).map(|s| s.as_str()),
327                )
328            } else {
329                (None, None)
330            };
331
332            // Check if this is a label that came from an expression-containing operation
333            // We can detect this by looking for the pattern "op_name(...)" and checking if we have the original expressions
334            graph_write.write_node_definition(
335                node_id,
336                label,
337                *node_type,
338                location_id,
339                location_type,
340            )?;
341        }
342
343        // Group nodes by location if requested
344        if config.show_location_groups {
345            let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
346            for (&node_id, (_, _, location)) in &structure.nodes {
347                if let Some(location_id) = location {
348                    nodes_by_location
349                        .entry(*location_id)
350                        .or_default()
351                        .push(node_id);
352                }
353            }
354
355            for (&location_id, node_ids) in &nodes_by_location {
356                if let Some(location_type) = structure.locations.get(&location_id) {
357                    graph_write.write_location_start(location_id, location_type)?;
358                    for &node_id in node_ids {
359                        graph_write.write_node(node_id)?;
360                    }
361                    graph_write.write_location_end()?;
362                }
363            }
364        }
365
366        // Write edges
367        for (src_id, dst_id, edge_type, label) in &structure.edges {
368            graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
369        }
370
371        graph_write.write_epilogue()?;
372        Ok(())
373    }
374
375    /// Build the graph structure by traversing the IR tree.
376    pub fn build_graph_structure(
377        &self,
378        structure: &mut HydroGraphStructure,
379        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
380        config: &HydroWriteConfig,
381    ) -> usize {
382        // Helper function for sink nodes to reduce duplication
383        fn build_sink_node(
384            structure: &mut HydroGraphStructure,
385            seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
386            config: &HydroWriteConfig,
387            input: &HydroNode,
388            metadata: Option<&crate::compile::ir::HydroIrMetadata>,
389            label: NodeLabel,
390            edge_type: HydroEdgeType,
391        ) -> usize {
392            let input_id = input.build_graph_structure(structure, seen_tees, config);
393            let location_id = metadata.and_then(|m| setup_location(structure, m));
394            let sink_id = structure.add_node(label, HydroNodeType::Sink, location_id);
395            structure.add_edge(input_id, sink_id, edge_type, None);
396            sink_id
397        }
398
399        match self {
400            // Sink operations with Stream edges - grouped by edge type
401            HydroRoot::ForEach { f, input, .. } => build_sink_node(
402                structure,
403                seen_tees,
404                config,
405                input,
406                None,
407                NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
408                HydroEdgeType::Stream,
409            ),
410
411            HydroRoot::SendExternal {
412                to_external_id,
413                to_key,
414                input,
415                ..
416            } => build_sink_node(
417                structure,
418                seen_tees,
419                config,
420                input,
421                None,
422                NodeLabel::with_exprs(
423                    format!("send_external({}:{})", to_external_id, to_key),
424                    vec![],
425                ),
426                HydroEdgeType::Stream,
427            ),
428
429            HydroRoot::DestSink { sink, input, .. } => build_sink_node(
430                structure,
431                seen_tees,
432                config,
433                input,
434                None,
435                NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
436                HydroEdgeType::Stream,
437            ),
438
439            // Sink operation with Cycle edge - grouped by edge type
440            HydroRoot::CycleSink { ident, input, .. } => build_sink_node(
441                structure,
442                seen_tees,
443                config,
444                input,
445                None,
446                NodeLabel::static_label(format!("cycle_sink({})", ident)),
447                HydroEdgeType::Cycle,
448            ),
449        }
450    }
451}
452
453impl HydroNode {
454    /// Build the graph structure recursively for this node.
455    pub fn build_graph_structure(
456        &self,
457        structure: &mut HydroGraphStructure,
458        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
459        config: &HydroWriteConfig,
460    ) -> usize {
461        use crate::location::dynamic::LocationId;
462
463        // Helper functions to reduce duplication, categorized by input/expression patterns
464
465        /// Common parameters for transform builder functions to reduce argument count
466        struct TransformParams<'a> {
467            structure: &'a mut HydroGraphStructure,
468            seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
469            config: &'a HydroWriteConfig,
470            input: &'a HydroNode,
471            metadata: &'a crate::compile::ir::HydroIrMetadata,
472            op_name: String,
473            node_type: HydroNodeType,
474            edge_type: HydroEdgeType,
475        }
476
477        // Single-input transform with no expressions
478        fn build_simple_transform(params: TransformParams) -> usize {
479            let input_id = params.input.build_graph_structure(
480                params.structure,
481                params.seen_tees,
482                params.config,
483            );
484            let location_id = setup_location(params.structure, params.metadata);
485            let node_id = params.structure.add_node(
486                NodeLabel::Static(params.op_name.to_string()),
487                params.node_type,
488                location_id,
489            );
490            params
491                .structure
492                .add_edge(input_id, node_id, params.edge_type, None);
493            node_id
494        }
495
496        // Single-input transform with one expression
497        fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> usize {
498            let input_id = params.input.build_graph_structure(
499                params.structure,
500                params.seen_tees,
501                params.config,
502            );
503            let location_id = setup_location(params.structure, params.metadata);
504            let node_id = params.structure.add_node(
505                NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
506                params.node_type,
507                location_id,
508            );
509            params
510                .structure
511                .add_edge(input_id, node_id, params.edge_type, None);
512            node_id
513        }
514
515        // Single-input transform with two expressions
516        fn build_dual_expr_transform(
517            params: TransformParams,
518            expr1: &DebugExpr,
519            expr2: &DebugExpr,
520        ) -> usize {
521            let input_id = params.input.build_graph_structure(
522                params.structure,
523                params.seen_tees,
524                params.config,
525            );
526            let location_id = setup_location(params.structure, params.metadata);
527            let node_id = params.structure.add_node(
528                NodeLabel::with_exprs(
529                    params.op_name.to_string(),
530                    vec![expr1.clone(), expr2.clone()],
531                ),
532                params.node_type,
533                location_id,
534            );
535            params
536                .structure
537                .add_edge(input_id, node_id, params.edge_type, None);
538            node_id
539        }
540
541        // Helper function for source nodes
542        fn build_source_node(
543            structure: &mut HydroGraphStructure,
544            metadata: &crate::compile::ir::HydroIrMetadata,
545            label: String,
546        ) -> usize {
547            let location_id = setup_location(structure, metadata);
548            structure.add_node(NodeLabel::Static(label), HydroNodeType::Source, location_id)
549        }
550
551        match self {
552            HydroNode::Placeholder => structure.add_node(
553                NodeLabel::Static("PLACEHOLDER".to_string()),
554                HydroNodeType::Transform,
555                None,
556            ),
557
558            HydroNode::Source {
559                source, metadata, ..
560            } => {
561                let label = match source {
562                    HydroSource::Stream(expr) => format!("source_stream({})", expr),
563                    HydroSource::ExternalNetwork() => "external_network()".to_string(),
564                    HydroSource::Iter(expr) => format!("source_iter({})", expr),
565                    HydroSource::Spin() => "spin()".to_string(),
566                };
567                build_source_node(structure, metadata, label)
568            }
569
570            HydroNode::ExternalInput {
571                from_external_id,
572                from_key,
573                metadata,
574                ..
575            } => build_source_node(
576                structure,
577                metadata,
578                format!("external_input({}:{})", from_external_id, from_key),
579            ),
580
581            HydroNode::CycleSource {
582                ident, metadata, ..
583            } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
584
585            HydroNode::Tee { inner, metadata } => {
586                let ptr = inner.as_ptr();
587                if let Some(&existing_id) = seen_tees.get(&ptr) {
588                    return existing_id;
589                }
590
591                let input_id = inner
592                    .0
593                    .borrow()
594                    .build_graph_structure(structure, seen_tees, config);
595                let location_id = setup_location(structure, metadata);
596
597                let tee_id = structure.add_node(
598                    NodeLabel::Static(extract_op_name(self.print_root())),
599                    HydroNodeType::Tee,
600                    location_id,
601                );
602
603                seen_tees.insert(ptr, tee_id);
604
605                structure.add_edge(input_id, tee_id, HydroEdgeType::Stream, None);
606
607                tee_id
608            }
609
610            // Transform operations with Stream edges - grouped by node/edge type
611            HydroNode::Delta { inner, metadata }
612            | HydroNode::DeferTick {
613                input: inner,
614                metadata,
615            }
616            | HydroNode::Enumerate {
617                input: inner,
618                metadata,
619                ..
620            }
621            | HydroNode::Unique {
622                input: inner,
623                metadata,
624            }
625            | HydroNode::ResolveFutures {
626                input: inner,
627                metadata,
628            }
629            | HydroNode::ResolveFuturesOrdered {
630                input: inner,
631                metadata,
632            } => build_simple_transform(TransformParams {
633                structure,
634                seen_tees,
635                config,
636                input: inner,
637                metadata,
638                op_name: extract_op_name(self.print_root()),
639                node_type: HydroNodeType::Transform,
640                edge_type: HydroEdgeType::Stream,
641            }),
642
643            // Transform operation with Persistent edge - grouped by node/edge type
644            HydroNode::Persist { inner, metadata } => build_simple_transform(TransformParams {
645                structure,
646                seen_tees,
647                config,
648                input: inner,
649                metadata,
650                op_name: extract_op_name(self.print_root()),
651                node_type: HydroNodeType::Transform,
652                edge_type: HydroEdgeType::Persistent,
653            }),
654
655            // Aggregation operation with Stream edge - grouped by node/edge type
656            HydroNode::Sort {
657                input: inner,
658                metadata,
659            } => build_simple_transform(TransformParams {
660                structure,
661                seen_tees,
662                config,
663                input: inner,
664                metadata,
665                op_name: extract_op_name(self.print_root()),
666                node_type: HydroNodeType::Aggregation,
667                edge_type: HydroEdgeType::Stream,
668            }),
669
670            // Single-expression Transform operations - grouped by node type
671            HydroNode::Map { f, input, metadata }
672            | HydroNode::Filter { f, input, metadata }
673            | HydroNode::FlatMap { f, input, metadata }
674            | HydroNode::FilterMap { f, input, metadata }
675            | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
676                TransformParams {
677                    structure,
678                    seen_tees,
679                    config,
680                    input,
681                    metadata,
682                    op_name: extract_op_name(self.print_root()),
683                    node_type: HydroNodeType::Transform,
684                    edge_type: HydroEdgeType::Stream,
685                },
686                f,
687            ),
688
689            // Single-expression Aggregation operations - grouped by node type
690            HydroNode::Reduce { f, input, metadata }
691            | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
692                TransformParams {
693                    structure,
694                    seen_tees,
695                    config,
696                    input,
697                    metadata,
698                    op_name: extract_op_name(self.print_root()),
699                    node_type: HydroNodeType::Aggregation,
700                    edge_type: HydroEdgeType::Stream,
701                },
702                f,
703            ),
704
705            // Join-like operations with left/right edge labels - grouped by edge labeling
706            HydroNode::Join {
707                left,
708                right,
709                metadata,
710            }
711            | HydroNode::CrossProduct {
712                left,
713                right,
714                metadata,
715            }
716            | HydroNode::CrossSingleton {
717                left,
718                right,
719                metadata,
720            } => {
721                let left_id = left.build_graph_structure(structure, seen_tees, config);
722                let right_id = right.build_graph_structure(structure, seen_tees, config);
723                let location_id = setup_location(structure, metadata);
724                let node_id = structure.add_node(
725                    NodeLabel::Static(extract_op_name(self.print_root())),
726                    HydroNodeType::Join,
727                    location_id,
728                );
729                structure.add_edge(
730                    left_id,
731                    node_id,
732                    HydroEdgeType::Stream,
733                    Some("left".to_string()),
734                );
735                structure.add_edge(
736                    right_id,
737                    node_id,
738                    HydroEdgeType::Stream,
739                    Some("right".to_string()),
740                );
741                node_id
742            }
743
744            // Join-like operations with pos/neg edge labels - grouped by edge labeling
745            HydroNode::Difference {
746                pos: left,
747                neg: right,
748                metadata,
749            }
750            | HydroNode::AntiJoin {
751                pos: left,
752                neg: right,
753                metadata,
754            } => {
755                let left_id = left.build_graph_structure(structure, seen_tees, config);
756                let right_id = right.build_graph_structure(structure, seen_tees, config);
757                let location_id = setup_location(structure, metadata);
758                let node_id = structure.add_node(
759                    NodeLabel::Static(extract_op_name(self.print_root())),
760                    HydroNodeType::Join,
761                    location_id,
762                );
763                structure.add_edge(
764                    left_id,
765                    node_id,
766                    HydroEdgeType::Stream,
767                    Some("pos".to_string()),
768                );
769                structure.add_edge(
770                    right_id,
771                    node_id,
772                    HydroEdgeType::Stream,
773                    Some("neg".to_string()),
774                );
775                node_id
776            }
777
778            // Dual expression transforms - consolidated using pattern matching
779            HydroNode::Fold {
780                init,
781                acc,
782                input,
783                metadata,
784            }
785            | HydroNode::FoldKeyed {
786                init,
787                acc,
788                input,
789                metadata,
790            }
791            | HydroNode::Scan {
792                init,
793                acc,
794                input,
795                metadata,
796            } => {
797                let node_type = HydroNodeType::Aggregation; // All are aggregation operations
798
799                build_dual_expr_transform(
800                    TransformParams {
801                        structure,
802                        seen_tees,
803                        config,
804                        input,
805                        metadata,
806                        op_name: extract_op_name(self.print_root()),
807                        node_type,
808                        edge_type: HydroEdgeType::Stream,
809                    },
810                    init,
811                    acc,
812                )
813            }
814
815            // Combination of join and transform
816            HydroNode::ReduceKeyedWatermark {
817                f,
818                input,
819                watermark,
820                metadata,
821            } => {
822                let input_id = input.build_graph_structure(structure, seen_tees, config);
823                let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
824                let location_id = setup_location(structure, metadata);
825                let join_node_id = structure.add_node(
826                    NodeLabel::Static(extract_op_name(self.print_root())),
827                    HydroNodeType::Join,
828                    location_id,
829                );
830                structure.add_edge(
831                    input_id,
832                    join_node_id,
833                    HydroEdgeType::Stream,
834                    Some("input".to_string()),
835                );
836                structure.add_edge(
837                    watermark_id,
838                    join_node_id,
839                    HydroEdgeType::Stream,
840                    Some("watermark".to_string()),
841                );
842
843                let node_id = structure.add_node(
844                    NodeLabel::with_exprs(
845                        extract_op_name(self.print_root()).to_string(),
846                        vec![f.clone()],
847                    ),
848                    HydroNodeType::Aggregation,
849                    location_id,
850                );
851                structure.add_edge(join_node_id, node_id, HydroEdgeType::Stream, None);
852                node_id
853            }
854
855            HydroNode::Network {
856                serialize_fn,
857                deserialize_fn,
858                input,
859                metadata,
860                ..
861            } => {
862                let input_id = input.build_graph_structure(structure, seen_tees, config);
863                let _from_location_id = setup_location(structure, metadata);
864
865                let to_location_id = match metadata.location_kind.root() {
866                    LocationId::Process(id) => {
867                        structure.add_location(*id, "Process".to_string());
868                        Some(*id)
869                    }
870                    LocationId::Cluster(id) => {
871                        structure.add_location(*id, "Cluster".to_string());
872                        Some(*id)
873                    }
874                    _ => None,
875                };
876
877                let mut label = "network(".to_string();
878                if serialize_fn.is_some() {
879                    label.push_str("ser");
880                }
881                if deserialize_fn.is_some() {
882                    if serialize_fn.is_some() {
883                        label.push_str(" + ");
884                    }
885                    label.push_str("deser");
886                }
887                label.push(')');
888
889                let network_id = structure.add_node(
890                    NodeLabel::Static(label),
891                    HydroNodeType::Network,
892                    to_location_id,
893                );
894                structure.add_edge(
895                    input_id,
896                    network_id,
897                    HydroEdgeType::Network,
898                    Some(format!("to {:?}", to_location_id)),
899                );
900                network_id
901            }
902
903            // Handle remaining node types
904            HydroNode::Unpersist { inner, .. } => {
905                // Unpersist is typically optimized away, just pass through
906                inner.build_graph_structure(structure, seen_tees, config)
907            }
908
909            HydroNode::Chain {
910                first,
911                second,
912                metadata,
913            } => {
914                let first_id = first.build_graph_structure(structure, seen_tees, config);
915                let second_id = second.build_graph_structure(structure, seen_tees, config);
916                let location_id = setup_location(structure, metadata);
917                let chain_id = structure.add_node(
918                    NodeLabel::Static(extract_op_name(self.print_root())),
919                    HydroNodeType::Transform,
920                    location_id,
921                );
922                structure.add_edge(
923                    first_id,
924                    chain_id,
925                    HydroEdgeType::Stream,
926                    Some("first".to_string()),
927                );
928                structure.add_edge(
929                    second_id,
930                    chain_id,
931                    HydroEdgeType::Stream,
932                    Some("second".to_string()),
933                );
934                chain_id
935            }
936
937            HydroNode::ChainFirst {
938                first,
939                second,
940                metadata,
941            } => {
942                let first_id = first.build_graph_structure(structure, seen_tees, config);
943                let second_id = second.build_graph_structure(structure, seen_tees, config);
944                let location_id = setup_location(structure, metadata);
945                let chain_id = structure.add_node(
946                    NodeLabel::Static(extract_op_name(self.print_root())),
947                    HydroNodeType::Transform,
948                    location_id,
949                );
950                structure.add_edge(
951                    first_id,
952                    chain_id,
953                    HydroEdgeType::Stream,
954                    Some("first".to_string()),
955                );
956                structure.add_edge(
957                    second_id,
958                    chain_id,
959                    HydroEdgeType::Stream,
960                    Some("second".to_string()),
961                );
962                chain_id
963            }
964
965            HydroNode::Counter {
966                tag: _,
967                prefix: _,
968                duration,
969                input,
970                metadata,
971            } => build_single_expr_transform(
972                TransformParams {
973                    structure,
974                    seen_tees,
975                    config,
976                    input,
977                    metadata,
978                    op_name: extract_op_name(self.print_root()),
979                    node_type: HydroNodeType::Transform,
980                    edge_type: HydroEdgeType::Stream,
981                },
982                duration,
983            ),
984        }
985    }
986}
987
988/// Utility functions for rendering multiple roots as a single graph.
989/// Macro to reduce duplication in render functions.
990macro_rules! render_hydro_ir {
991    ($name:ident, $write_fn:ident) => {
992        pub fn $name(roots: &[HydroRoot], config: &HydroWriteConfig) -> String {
993            let mut output = String::new();
994            $write_fn(&mut output, roots, config).unwrap();
995            output
996        }
997    };
998}
999
1000/// Macro to reduce duplication in write functions.
1001macro_rules! write_hydro_ir {
1002    ($name:ident, $writer_type:ty, $constructor:expr) => {
1003        pub fn $name(
1004            output: impl std::fmt::Write,
1005            roots: &[HydroRoot],
1006            config: &HydroWriteConfig,
1007        ) -> std::fmt::Result {
1008            let mut graph_write: $writer_type = $constructor(output, config);
1009            write_hydro_ir_graph(&mut graph_write, roots, config)
1010        }
1011    };
1012}
1013
1014render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
1015write_hydro_ir!(
1016    write_hydro_ir_mermaid,
1017    HydroMermaid<_>,
1018    HydroMermaid::new_with_config
1019);
1020
1021render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1022write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1023
1024render_hydro_ir!(render_hydro_ir_reactflow, write_hydro_ir_reactflow);
1025write_hydro_ir!(
1026    write_hydro_ir_reactflow,
1027    HydroReactFlow<_>,
1028    HydroReactFlow::new
1029);
1030
1031fn write_hydro_ir_graph<W>(
1032    mut graph_write: W,
1033    roots: &[HydroRoot],
1034    config: &HydroWriteConfig,
1035) -> Result<(), W::Err>
1036where
1037    W: HydroGraphWrite,
1038{
1039    let mut structure = HydroGraphStructure::new();
1040    let mut seen_tees = HashMap::new();
1041
1042    // Build the graph structure for all roots
1043    for leaf in roots {
1044        leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1045    }
1046
1047    // Write the graph using the same logic as individual roots
1048    graph_write.write_prologue()?;
1049
1050    for (&node_id, (label, node_type, location)) in &structure.nodes {
1051        let (location_id, location_type) = if let Some(loc_id) = location {
1052            (
1053                Some(*loc_id),
1054                structure.locations.get(loc_id).map(|s| s.as_str()),
1055            )
1056        } else {
1057            (None, None)
1058        };
1059        graph_write.write_node_definition(
1060            node_id,
1061            label,
1062            *node_type,
1063            location_id,
1064            location_type,
1065        )?;
1066    }
1067
1068    if config.show_location_groups {
1069        let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
1070        for (&node_id, (_, _, location)) in &structure.nodes {
1071            if let Some(location_id) = location {
1072                nodes_by_location
1073                    .entry(*location_id)
1074                    .or_default()
1075                    .push(node_id);
1076            }
1077        }
1078
1079        for (&location_id, node_ids) in &nodes_by_location {
1080            if let Some(location_type) = structure.locations.get(&location_id) {
1081                graph_write.write_location_start(location_id, location_type)?;
1082                for &node_id in node_ids {
1083                    graph_write.write_node(node_id)?;
1084                }
1085                graph_write.write_location_end()?;
1086            }
1087        }
1088    }
1089
1090    for (src_id, dst_id, edge_type, label) in &structure.edges {
1091        graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
1092    }
1093
1094    graph_write.write_epilogue()?;
1095    Ok(())
1096}