zernel/telemetry/
client.rs

1// Copyright (C) 2026 Dyber, Inc. — Proprietary
2
3use futures_util::StreamExt;
4use serde::Deserialize;
5use tokio::sync::mpsc;
6use tokio_tungstenite::connect_async;
7use tracing::{debug, warn};
8
9/// Telemetry snapshot received from zerneld via WebSocket.
10/// Must match the JSON format from AggregatedMetrics::to_ws_snapshot().
11#[derive(Debug, Clone, Deserialize)]
12pub struct TelemetrySnapshot {
13    #[serde(default)]
14    pub gpu_utilization: Vec<GpuEntry>,
15    #[serde(default)]
16    pub cuda_latency_p50_us: f64,
17    #[serde(default)]
18    pub cuda_latency_p99_us: f64,
19    #[serde(default)]
20    pub nccl_allreduce_p50_ms: f64,
21    #[serde(default)]
22    pub nccl_allreduce_p99_ms: f64,
23    #[serde(default)]
24    pub dataloader_wait_p50_ms: f64,
25    #[serde(default)]
26    pub last_update_ms: u64,
27}
28
29/// GPU memory entry from zerneld (matches to_ws_snapshot format).
30#[derive(Debug, Clone, Deserialize)]
31pub struct GpuEntry {
32    pub key: String,
33    pub current_bytes: u64,
34    pub peak_bytes: u64,
35}
36
37/// Client that connects to zerneld WebSocket and receives telemetry.
38pub struct TelemetryClient {
39    url: String,
40}
41
42impl TelemetryClient {
43    pub fn new(host: &str, port: u16) -> Self {
44        Self {
45            url: format!("ws://{host}:{port}"),
46        }
47    }
48
49    /// Try to connect to zerneld. Returns a receiver channel if successful.
50    /// The connection runs in a background task with automatic reconnection.
51    pub async fn try_connect(&self) -> Option<mpsc::UnboundedReceiver<TelemetrySnapshot>> {
52        let url = self.url.clone();
53
54        // Quick connectivity check
55        match connect_async(&url).await {
56            Ok((ws, _)) => {
57                let (tx, rx) = mpsc::unbounded_channel();
58                let url_clone = url.clone();
59
60                tokio::spawn(async move {
61                    Self::reader_loop(ws, tx.clone(), url_clone).await;
62                });
63
64                debug!(url = self.url, "connected to zerneld");
65                Some(rx)
66            }
67            Err(e) => {
68                debug!(url = self.url, error = %e, "could not connect to zerneld");
69                None
70            }
71        }
72    }
73
74    async fn reader_loop(
75        ws: tokio_tungstenite::WebSocketStream<
76            tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
77        >,
78        tx: mpsc::UnboundedSender<TelemetrySnapshot>,
79        url: String,
80    ) {
81        let (_write, mut read) = ws.split();
82        let mut backoff_ms = 1000u64;
83
84        loop {
85            match read.next().await {
86                Some(Ok(msg)) => {
87                    if let tokio_tungstenite::tungstenite::Message::Text(text) = msg {
88                        match serde_json::from_str::<TelemetrySnapshot>(&text) {
89                            Ok(snapshot) => {
90                                if tx.send(snapshot).is_err() {
91                                    // Receiver dropped
92                                    return;
93                                }
94                                backoff_ms = 1000; // reset on success
95                            }
96                            Err(e) => {
97                                debug!(error = %e, "failed to parse telemetry snapshot");
98                            }
99                        }
100                    }
101                }
102                Some(Err(e)) => {
103                    warn!(error = %e, "WebSocket error, reconnecting...");
104                    break;
105                }
106                None => {
107                    warn!("WebSocket closed, reconnecting...");
108                    break;
109                }
110            }
111        }
112
113        // Reconnection with exponential backoff
114        loop {
115            tokio::time::sleep(tokio::time::Duration::from_millis(backoff_ms)).await;
116            match connect_async(&url).await {
117                Ok((ws, _)) => {
118                    debug!("reconnected to zerneld");
119                    // Recurse into reader loop with new connection
120                    Box::pin(Self::reader_loop(ws, tx, url)).await;
121                    return;
122                }
123                Err(e) => {
124                    debug!(error = %e, backoff_ms, "reconnect failed");
125                    backoff_ms = (backoff_ms * 2).min(30_000);
126                }
127            }
128        }
129    }
130}
131
132/// Get the zerneld WebSocket port from environment or default.
133pub fn ws_port() -> u16 {
134    std::env::var("ZERNEL_WS_PORT")
135        .ok()
136        .and_then(|s| s.parse().ok())
137        .unwrap_or(9092)
138}
139
140/// Get the zerneld metrics port from environment or default.
141pub fn metrics_port() -> u16 {
142    std::env::var("ZERNEL_METRICS_PORT")
143        .ok()
144        .and_then(|s| s.parse().ok())
145        .unwrap_or(9091)
146}