1use futures_util::StreamExt;
6use tokio::sync::broadcast;
7use tokio_tungstenite::connect_async;
8use tracing::{debug, info, warn};
9
10pub async fn relay_zerneld_to_sse(url: &str, tx: broadcast::Sender<String>) {
12 let mut backoff_ms = 1000u64;
13
14 loop {
15 info!(url, "connecting to zerneld...");
16
17 match connect_async(url).await {
18 Ok((ws, _)) => {
19 info!("connected to zerneld");
20 backoff_ms = 1000;
21
22 let (_write, mut read) = ws.split();
23
24 while let Some(msg) = read.next().await {
25 match msg {
26 Ok(tokio_tungstenite::tungstenite::Message::Text(text)) => {
27 if let Ok(data) = serde_json::from_str::<serde_json::Value>(&text) {
28 let html = render_telemetry_html(&data);
29 let _ = tx.send(html);
30 }
31 }
32 Ok(tokio_tungstenite::tungstenite::Message::Close(_)) => {
33 info!("zerneld WebSocket closed");
34 break;
35 }
36 Err(e) => {
37 warn!(error = %e, "WebSocket error");
38 break;
39 }
40 _ => {}
41 }
42 }
43 }
44 Err(e) => {
45 debug!(error = %e, backoff_ms, "failed to connect to zerneld");
46 }
47 }
48
49 tokio::time::sleep(tokio::time::Duration::from_millis(backoff_ms)).await;
50 backoff_ms = (backoff_ms * 2).min(30_000);
51 }
52}
53
54fn render_telemetry_html(data: &serde_json::Value) -> String {
56 let cuda_p50 = data["cuda_latency_p50_us"].as_f64().unwrap_or(0.0);
57 let cuda_p99 = data["cuda_latency_p99_us"].as_f64().unwrap_or(0.0);
58 let nccl_p50 = data["nccl_allreduce_p50_ms"].as_f64().unwrap_or(0.0);
59 let nccl_p99 = data["nccl_allreduce_p99_ms"].as_f64().unwrap_or(0.0);
60 let dl_p50 = data["dataloader_wait_p50_ms"].as_f64().unwrap_or(0.0);
61
62 let mut gpu_html = String::new();
63 if let Some(gpus) = data["gpu_utilization"].as_array() {
64 for gpu in gpus {
65 let key = gpu["key"].as_str().unwrap_or("?");
66 let used = gpu["current_bytes"].as_u64().unwrap_or(0);
67 let peak = gpu["peak_bytes"].as_u64().unwrap_or(1);
68 let pct = if peak > 0 {
69 (used as f64 / peak as f64 * 100.0) as u32
70 } else {
71 0
72 };
73 let used_gb = used as f64 / (1024.0 * 1024.0 * 1024.0);
74 let total_gb = peak as f64 / (1024.0 * 1024.0 * 1024.0);
75 let color = if pct > 90 {
76 "#22c55e"
77 } else if pct > 70 {
78 "#eab308"
79 } else {
80 "#ef4444"
81 };
82
83 gpu_html.push_str(&format!(
84 r#"<div class="gpu-card">
85 <div class="gpu-label">{key} — {used_gb:.1}/{total_gb:.1} GB</div>
86 <div class="gpu-bar-bg"><div class="gpu-bar" style="width:{pct}%;background:{color}"></div></div>
87 <div class="gpu-pct">{pct}%</div>
88 </div>"#
89 ));
90 }
91 }
92
93 format!(
94 r#"<div id="gpu-section">{gpu_html}</div>
95 <div id="telemetry-section">
96 <table>
97 <tr><td>CUDA launch p50</td><td>{cuda_p50:.0} us</td></tr>
98 <tr><td>CUDA launch p99</td><td>{cuda_p99:.0} us</td></tr>
99 <tr><td>NCCL allreduce p50</td><td>{nccl_p50:.0} ms</td></tr>
100 <tr><td>NCCL allreduce p99</td><td>{nccl_p99:.0} ms</td></tr>
101 <tr><td>DataLoader wait p50</td><td>{dl_p50:.0} ms</td></tr>
102 </table>
103 </div>"#
104 )
105}