zernel_dashboard/
sse.rs

1// Copyright (C) 2026 Dyber, Inc. — Proprietary
2
3//! Relays zerneld WebSocket telemetry to SSE broadcast for the dashboard.
4
5use futures_util::StreamExt;
6use tokio::sync::broadcast;
7use tokio_tungstenite::connect_async;
8use tracing::{debug, info, warn};
9
10/// Connect to zerneld WebSocket and relay telemetry as SSE-formatted HTML.
11pub async fn relay_zerneld_to_sse(url: &str, tx: broadcast::Sender<String>) {
12    let mut backoff_ms = 1000u64;
13
14    loop {
15        info!(url, "connecting to zerneld...");
16
17        match connect_async(url).await {
18            Ok((ws, _)) => {
19                info!("connected to zerneld");
20                backoff_ms = 1000;
21
22                let (_write, mut read) = ws.split();
23
24                while let Some(msg) = read.next().await {
25                    match msg {
26                        Ok(tokio_tungstenite::tungstenite::Message::Text(text)) => {
27                            if let Ok(data) = serde_json::from_str::<serde_json::Value>(&text) {
28                                let html = render_telemetry_html(&data);
29                                let _ = tx.send(html);
30                            }
31                        }
32                        Ok(tokio_tungstenite::tungstenite::Message::Close(_)) => {
33                            info!("zerneld WebSocket closed");
34                            break;
35                        }
36                        Err(e) => {
37                            warn!(error = %e, "WebSocket error");
38                            break;
39                        }
40                        _ => {}
41                    }
42                }
43            }
44            Err(e) => {
45                debug!(error = %e, backoff_ms, "failed to connect to zerneld");
46            }
47        }
48
49        tokio::time::sleep(tokio::time::Duration::from_millis(backoff_ms)).await;
50        backoff_ms = (backoff_ms * 2).min(30_000);
51    }
52}
53
54/// Render telemetry JSON as an HTML fragment for htmx SSE swap.
55fn render_telemetry_html(data: &serde_json::Value) -> String {
56    let cuda_p50 = data["cuda_latency_p50_us"].as_f64().unwrap_or(0.0);
57    let cuda_p99 = data["cuda_latency_p99_us"].as_f64().unwrap_or(0.0);
58    let nccl_p50 = data["nccl_allreduce_p50_ms"].as_f64().unwrap_or(0.0);
59    let nccl_p99 = data["nccl_allreduce_p99_ms"].as_f64().unwrap_or(0.0);
60    let dl_p50 = data["dataloader_wait_p50_ms"].as_f64().unwrap_or(0.0);
61
62    let mut gpu_html = String::new();
63    if let Some(gpus) = data["gpu_utilization"].as_array() {
64        for gpu in gpus {
65            let key = gpu["key"].as_str().unwrap_or("?");
66            let used = gpu["current_bytes"].as_u64().unwrap_or(0);
67            let peak = gpu["peak_bytes"].as_u64().unwrap_or(1);
68            let pct = if peak > 0 {
69                (used as f64 / peak as f64 * 100.0) as u32
70            } else {
71                0
72            };
73            let used_gb = used as f64 / (1024.0 * 1024.0 * 1024.0);
74            let total_gb = peak as f64 / (1024.0 * 1024.0 * 1024.0);
75            let color = if pct > 90 {
76                "#22c55e"
77            } else if pct > 70 {
78                "#eab308"
79            } else {
80                "#ef4444"
81            };
82
83            gpu_html.push_str(&format!(
84                r#"<div class="gpu-card">
85                    <div class="gpu-label">{key} &mdash; {used_gb:.1}/{total_gb:.1} GB</div>
86                    <div class="gpu-bar-bg"><div class="gpu-bar" style="width:{pct}%;background:{color}"></div></div>
87                    <div class="gpu-pct">{pct}%</div>
88                </div>"#
89            ));
90        }
91    }
92
93    format!(
94        r#"<div id="gpu-section">{gpu_html}</div>
95        <div id="telemetry-section">
96            <table>
97                <tr><td>CUDA launch p50</td><td>{cuda_p50:.0} us</td></tr>
98                <tr><td>CUDA launch p99</td><td>{cuda_p99:.0} us</td></tr>
99                <tr><td>NCCL allreduce p50</td><td>{nccl_p50:.0} ms</td></tr>
100                <tr><td>NCCL allreduce p99</td><td>{nccl_p99:.0} ms</td></tr>
101                <tr><td>DataLoader wait p50</td><td>{dl_p50:.0} ms</td></tr>
102            </table>
103        </div>"#
104    )
105}