zernel/telemetry/
client.rs1use futures_util::StreamExt;
4use serde::Deserialize;
5use tokio::sync::mpsc;
6use tokio_tungstenite::connect_async;
7use tracing::{debug, warn};
8
9#[derive(Debug, Clone, Deserialize)]
12pub struct TelemetrySnapshot {
13 #[serde(default)]
14 pub gpu_utilization: Vec<GpuEntry>,
15 #[serde(default)]
16 pub cuda_latency_p50_us: f64,
17 #[serde(default)]
18 pub cuda_latency_p99_us: f64,
19 #[serde(default)]
20 pub nccl_allreduce_p50_ms: f64,
21 #[serde(default)]
22 pub nccl_allreduce_p99_ms: f64,
23 #[serde(default)]
24 pub dataloader_wait_p50_ms: f64,
25 #[serde(default)]
26 pub last_update_ms: u64,
27}
28
29#[derive(Debug, Clone, Deserialize)]
31pub struct GpuEntry {
32 pub key: String,
33 pub current_bytes: u64,
34 pub peak_bytes: u64,
35}
36
37pub struct TelemetryClient {
39 url: String,
40}
41
42impl TelemetryClient {
43 pub fn new(host: &str, port: u16) -> Self {
44 Self {
45 url: format!("ws://{host}:{port}"),
46 }
47 }
48
49 pub async fn try_connect(&self) -> Option<mpsc::UnboundedReceiver<TelemetrySnapshot>> {
52 let url = self.url.clone();
53
54 match connect_async(&url).await {
56 Ok((ws, _)) => {
57 let (tx, rx) = mpsc::unbounded_channel();
58 let url_clone = url.clone();
59
60 tokio::spawn(async move {
61 Self::reader_loop(ws, tx.clone(), url_clone).await;
62 });
63
64 debug!(url = self.url, "connected to zerneld");
65 Some(rx)
66 }
67 Err(e) => {
68 debug!(url = self.url, error = %e, "could not connect to zerneld");
69 None
70 }
71 }
72 }
73
74 async fn reader_loop(
75 ws: tokio_tungstenite::WebSocketStream<
76 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
77 >,
78 tx: mpsc::UnboundedSender<TelemetrySnapshot>,
79 url: String,
80 ) {
81 let (_write, mut read) = ws.split();
82 let mut backoff_ms = 1000u64;
83
84 loop {
85 match read.next().await {
86 Some(Ok(msg)) => {
87 if let tokio_tungstenite::tungstenite::Message::Text(text) = msg {
88 match serde_json::from_str::<TelemetrySnapshot>(&text) {
89 Ok(snapshot) => {
90 if tx.send(snapshot).is_err() {
91 return;
93 }
94 backoff_ms = 1000; }
96 Err(e) => {
97 debug!(error = %e, "failed to parse telemetry snapshot");
98 }
99 }
100 }
101 }
102 Some(Err(e)) => {
103 warn!(error = %e, "WebSocket error, reconnecting...");
104 break;
105 }
106 None => {
107 warn!("WebSocket closed, reconnecting...");
108 break;
109 }
110 }
111 }
112
113 loop {
115 tokio::time::sleep(tokio::time::Duration::from_millis(backoff_ms)).await;
116 match connect_async(&url).await {
117 Ok((ws, _)) => {
118 debug!("reconnected to zerneld");
119 Box::pin(Self::reader_loop(ws, tx, url)).await;
121 return;
122 }
123 Err(e) => {
124 debug!(error = %e, backoff_ms, "reconnect failed");
125 backoff_ms = (backoff_ms * 2).min(30_000);
126 }
127 }
128 }
129 }
130}
131
132pub fn ws_port() -> u16 {
134 std::env::var("ZERNEL_WS_PORT")
135 .ok()
136 .and_then(|s| s.parse().ok())
137 .unwrap_or(9092)
138}
139
140pub fn metrics_port() -> u16 {
142 std::env::var("ZERNEL_METRICS_PORT")
143 .ok()
144 .and_then(|s| s.parse().ok())
145 .unwrap_or(9091)
146}