1use serde::Serialize;
4use std::collections::HashMap;
5
6#[derive(Debug, Default, Clone, Serialize)]
8pub struct AggregatedMetrics {
9 pub gpu_memory: HashMap<String, GpuMemMetrics>,
10 pub cuda_latency: HashMap<u32, LatencyHistogram>,
11 pub nccl_duration: HashMap<String, LatencyHistogram>,
12 pub dataloader_wait: HashMap<u32, LatencyHistogram>,
13 pub dist_sync: HashMap<u32, LatencyHistogram>,
14 pub last_update_ms: u64,
16}
17
18impl AggregatedMetrics {
19 pub fn record_gpu_mem(&mut self, pid: u32, gpu_id: u32, used: u64, peak: u64) {
21 let key = format!("{pid}:{gpu_id}");
22 let entry = self.gpu_memory.entry(key).or_default();
23 entry.current_bytes = used;
24 if peak > entry.peak_bytes {
25 entry.peak_bytes = peak;
26 }
27 entry.alloc_count += 1;
28 }
29
30 pub fn record_cuda_latency(&mut self, pid: u32, latency_ns: u64) {
32 self.cuda_latency.entry(pid).or_default().record(latency_ns);
33 }
34
35 pub fn record_nccl(&mut self, op: &str, duration_ns: u64) {
37 self.nccl_duration
38 .entry(op.to_string())
39 .or_default()
40 .record(duration_ns);
41 }
42
43 pub fn record_dataloader_wait(&mut self, pid: u32, wait_ns: u64) {
45 self.dataloader_wait.entry(pid).or_default().record(wait_ns);
46 }
47
48 pub fn to_prometheus(&self) -> String {
50 let mut out = String::with_capacity(4096);
51
52 out.push_str("# HELP zernel_gpu_memory_used_bytes Current GPU memory usage\n");
54 out.push_str("# TYPE zernel_gpu_memory_used_bytes gauge\n");
55 for (key, m) in &self.gpu_memory {
56 let parts: Vec<&str> = key.split(':').collect();
57 if parts.len() == 2 {
58 out.push_str(&format!(
59 "zernel_gpu_memory_used_bytes{{pid=\"{}\",gpu_id=\"{}\"}} {}\n",
60 parts[0], parts[1], m.current_bytes
61 ));
62 }
63 }
64
65 out.push_str("# HELP zernel_gpu_memory_peak_bytes Peak GPU memory usage\n");
66 out.push_str("# TYPE zernel_gpu_memory_peak_bytes gauge\n");
67 for (key, m) in &self.gpu_memory {
68 let parts: Vec<&str> = key.split(':').collect();
69 if parts.len() == 2 {
70 out.push_str(&format!(
71 "zernel_gpu_memory_peak_bytes{{pid=\"{}\",gpu_id=\"{}\"}} {}\n",
72 parts[0], parts[1], m.peak_bytes
73 ));
74 }
75 }
76
77 out.push_str("# HELP zernel_cuda_launch_latency_seconds CUDA kernel launch latency\n");
79 out.push_str("# TYPE zernel_cuda_launch_latency_seconds summary\n");
80 for (pid, h) in &self.cuda_latency {
81 out.push_str(&format!(
82 "zernel_cuda_launch_latency_seconds{{pid=\"{pid}\",quantile=\"0.5\"}} {:.6}\n",
83 h.p50_ns as f64 / 1e9
84 ));
85 out.push_str(&format!(
86 "zernel_cuda_launch_latency_seconds{{pid=\"{pid}\",quantile=\"0.99\"}} {:.6}\n",
87 h.p99_ns as f64 / 1e9
88 ));
89 out.push_str(&format!(
90 "zernel_cuda_launch_latency_seconds_count{{pid=\"{pid}\"}} {}\n",
91 h.count
92 ));
93 }
94
95 out.push_str("# HELP zernel_nccl_collective_duration_seconds NCCL collective duration\n");
97 out.push_str("# TYPE zernel_nccl_collective_duration_seconds summary\n");
98 for (op, h) in &self.nccl_duration {
99 out.push_str(&format!(
100 "zernel_nccl_collective_duration_seconds{{op=\"{op}\",quantile=\"0.5\"}} {:.6}\n",
101 h.p50_ns as f64 / 1e9
102 ));
103 out.push_str(&format!(
104 "zernel_nccl_collective_duration_seconds{{op=\"{op}\",quantile=\"0.99\"}} {:.6}\n",
105 h.p99_ns as f64 / 1e9
106 ));
107 }
108
109 out.push_str("# HELP zernel_dataloader_wait_seconds DataLoader wait time\n");
111 out.push_str("# TYPE zernel_dataloader_wait_seconds summary\n");
112 for (pid, h) in &self.dataloader_wait {
113 out.push_str(&format!(
114 "zernel_dataloader_wait_seconds{{pid=\"{pid}\",quantile=\"0.5\"}} {:.6}\n",
115 h.p50_ns as f64 / 1e9
116 ));
117 out.push_str(&format!(
118 "zernel_dataloader_wait_seconds{{pid=\"{pid}\",quantile=\"0.99\"}} {:.6}\n",
119 h.p99_ns as f64 / 1e9
120 ));
121 }
122
123 out
124 }
125
126 pub fn to_ws_snapshot(&self) -> serde_json::Value {
128 serde_json::json!({
129 "gpu_utilization": self.gpu_memory.iter().map(|(k, v)| {
130 serde_json::json!({
131 "key": k,
132 "current_bytes": v.current_bytes,
133 "peak_bytes": v.peak_bytes,
134 })
135 }).collect::<Vec<_>>(),
136 "cuda_latency_p50_us": self.cuda_latency.values().next()
137 .map(|h| h.p50_ns as f64 / 1000.0).unwrap_or(0.0),
138 "cuda_latency_p99_us": self.cuda_latency.values().next()
139 .map(|h| h.p99_ns as f64 / 1000.0).unwrap_or(0.0),
140 "nccl_allreduce_p50_ms": self.nccl_duration.get("all_reduce")
141 .map(|h| h.p50_ns as f64 / 1e6).unwrap_or(0.0),
142 "nccl_allreduce_p99_ms": self.nccl_duration.get("all_reduce")
143 .map(|h| h.p99_ns as f64 / 1e6).unwrap_or(0.0),
144 "dataloader_wait_p50_ms": self.dataloader_wait.values().next()
145 .map(|h| h.p50_ns as f64 / 1e6).unwrap_or(0.0),
146 "last_update_ms": self.last_update_ms,
147 })
148 }
149}
150
151#[derive(Debug, Default, Clone, Serialize)]
152pub struct GpuMemMetrics {
153 pub current_bytes: u64,
154 pub peak_bytes: u64,
155 pub alloc_count: u64,
156 pub free_count: u64,
157}
158
159#[derive(Debug, Clone, Serialize)]
160pub struct LatencyHistogram {
161 pub count: u64,
162 pub sum_ns: u64,
163 pub min_ns: u64,
164 pub max_ns: u64,
165 pub p50_ns: u64,
166 pub p99_ns: u64,
167 #[serde(skip)]
168 samples: Vec<u64>,
169}
170
171impl Default for LatencyHistogram {
172 fn default() -> Self {
173 Self {
174 count: 0,
175 sum_ns: 0,
176 min_ns: u64::MAX,
177 max_ns: 0,
178 p50_ns: 0,
179 p99_ns: 0,
180 samples: Vec::new(),
181 }
182 }
183}
184
185impl LatencyHistogram {
186 pub fn record(&mut self, value_ns: u64) {
187 self.count += 1;
188 self.sum_ns += value_ns;
189 if value_ns < self.min_ns {
190 self.min_ns = value_ns;
191 }
192 if value_ns > self.max_ns {
193 self.max_ns = value_ns;
194 }
195 self.samples.push(value_ns);
196
197 if self.samples.len() <= 10 || self.samples.len().is_multiple_of(100) {
199 self.recompute_percentiles();
200 }
201 }
202
203 fn recompute_percentiles(&mut self) {
204 if self.samples.is_empty() {
205 return;
206 }
207 let mut sorted = self.samples.clone();
208 sorted.sort_unstable();
209 let len = sorted.len();
210 self.p50_ns = sorted[len / 2];
211 self.p99_ns = sorted[((len as f64 * 0.99) as usize).min(len - 1)];
212 }
213
214 pub fn mean_ns(&self) -> f64 {
215 if self.count == 0 {
216 0.0
217 } else {
218 self.sum_ns as f64 / self.count as f64
219 }
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 #[test]
228 fn histogram_percentiles() {
229 let mut h = LatencyHistogram::default();
230 for i in 1..=100 {
231 h.record(i * 1000);
232 }
233 assert_eq!(h.count, 100);
234 assert_eq!(h.min_ns, 1000);
235 assert_eq!(h.max_ns, 100_000);
236 assert!(h.p50_ns > 0);
237 assert!(h.p99_ns >= h.p50_ns);
238 }
239
240 #[test]
241 fn prometheus_format() {
242 let mut m = AggregatedMetrics::default();
243 m.record_gpu_mem(1234, 0, 84934656, 84934656);
244 m.record_cuda_latency(1234, 142000);
245 let prom = m.to_prometheus();
246 assert!(prom.contains("zernel_gpu_memory_used_bytes"));
247 assert!(prom.contains("84934656"));
248 }
249
250 #[test]
251 fn ws_snapshot_is_valid_json() {
252 let m = AggregatedMetrics::default();
253 let snap = m.to_ws_snapshot();
254 assert!(snap.is_object());
255 }
256}