zernel_ebpf/
aggregation.rs

1// Copyright (C) 2026 Dyber, Inc. — GPL-2.0
2
3use serde::Serialize;
4use std::collections::HashMap;
5
6/// Aggregated metrics computed from raw BPF events.
7#[derive(Debug, Default, Clone, Serialize)]
8pub struct AggregatedMetrics {
9    pub gpu_memory: HashMap<String, GpuMemMetrics>,
10    pub cuda_latency: HashMap<u32, LatencyHistogram>,
11    pub nccl_duration: HashMap<String, LatencyHistogram>,
12    pub dataloader_wait: HashMap<u32, LatencyHistogram>,
13    pub dist_sync: HashMap<u32, LatencyHistogram>,
14    /// Timestamp (ms since epoch) of last update.
15    pub last_update_ms: u64,
16}
17
18impl AggregatedMetrics {
19    /// Record a GPU memory event.
20    pub fn record_gpu_mem(&mut self, pid: u32, gpu_id: u32, used: u64, peak: u64) {
21        let key = format!("{pid}:{gpu_id}");
22        let entry = self.gpu_memory.entry(key).or_default();
23        entry.current_bytes = used;
24        if peak > entry.peak_bytes {
25            entry.peak_bytes = peak;
26        }
27        entry.alloc_count += 1;
28    }
29
30    /// Record a CUDA kernel launch latency.
31    pub fn record_cuda_latency(&mut self, pid: u32, latency_ns: u64) {
32        self.cuda_latency.entry(pid).or_default().record(latency_ns);
33    }
34
35    /// Record an NCCL collective duration.
36    pub fn record_nccl(&mut self, op: &str, duration_ns: u64) {
37        self.nccl_duration
38            .entry(op.to_string())
39            .or_default()
40            .record(duration_ns);
41    }
42
43    /// Record a DataLoader wait time.
44    pub fn record_dataloader_wait(&mut self, pid: u32, wait_ns: u64) {
45        self.dataloader_wait.entry(pid).or_default().record(wait_ns);
46    }
47
48    /// Format as Prometheus text exposition.
49    pub fn to_prometheus(&self) -> String {
50        let mut out = String::with_capacity(4096);
51
52        // GPU Memory
53        out.push_str("# HELP zernel_gpu_memory_used_bytes Current GPU memory usage\n");
54        out.push_str("# TYPE zernel_gpu_memory_used_bytes gauge\n");
55        for (key, m) in &self.gpu_memory {
56            let parts: Vec<&str> = key.split(':').collect();
57            if parts.len() == 2 {
58                out.push_str(&format!(
59                    "zernel_gpu_memory_used_bytes{{pid=\"{}\",gpu_id=\"{}\"}} {}\n",
60                    parts[0], parts[1], m.current_bytes
61                ));
62            }
63        }
64
65        out.push_str("# HELP zernel_gpu_memory_peak_bytes Peak GPU memory usage\n");
66        out.push_str("# TYPE zernel_gpu_memory_peak_bytes gauge\n");
67        for (key, m) in &self.gpu_memory {
68            let parts: Vec<&str> = key.split(':').collect();
69            if parts.len() == 2 {
70                out.push_str(&format!(
71                    "zernel_gpu_memory_peak_bytes{{pid=\"{}\",gpu_id=\"{}\"}} {}\n",
72                    parts[0], parts[1], m.peak_bytes
73                ));
74            }
75        }
76
77        // CUDA Latency
78        out.push_str("# HELP zernel_cuda_launch_latency_seconds CUDA kernel launch latency\n");
79        out.push_str("# TYPE zernel_cuda_launch_latency_seconds summary\n");
80        for (pid, h) in &self.cuda_latency {
81            out.push_str(&format!(
82                "zernel_cuda_launch_latency_seconds{{pid=\"{pid}\",quantile=\"0.5\"}} {:.6}\n",
83                h.p50_ns as f64 / 1e9
84            ));
85            out.push_str(&format!(
86                "zernel_cuda_launch_latency_seconds{{pid=\"{pid}\",quantile=\"0.99\"}} {:.6}\n",
87                h.p99_ns as f64 / 1e9
88            ));
89            out.push_str(&format!(
90                "zernel_cuda_launch_latency_seconds_count{{pid=\"{pid}\"}} {}\n",
91                h.count
92            ));
93        }
94
95        // NCCL
96        out.push_str("# HELP zernel_nccl_collective_duration_seconds NCCL collective duration\n");
97        out.push_str("# TYPE zernel_nccl_collective_duration_seconds summary\n");
98        for (op, h) in &self.nccl_duration {
99            out.push_str(&format!(
100                "zernel_nccl_collective_duration_seconds{{op=\"{op}\",quantile=\"0.5\"}} {:.6}\n",
101                h.p50_ns as f64 / 1e9
102            ));
103            out.push_str(&format!(
104                "zernel_nccl_collective_duration_seconds{{op=\"{op}\",quantile=\"0.99\"}} {:.6}\n",
105                h.p99_ns as f64 / 1e9
106            ));
107        }
108
109        // DataLoader
110        out.push_str("# HELP zernel_dataloader_wait_seconds DataLoader wait time\n");
111        out.push_str("# TYPE zernel_dataloader_wait_seconds summary\n");
112        for (pid, h) in &self.dataloader_wait {
113            out.push_str(&format!(
114                "zernel_dataloader_wait_seconds{{pid=\"{pid}\",quantile=\"0.5\"}} {:.6}\n",
115                h.p50_ns as f64 / 1e9
116            ));
117            out.push_str(&format!(
118                "zernel_dataloader_wait_seconds{{pid=\"{pid}\",quantile=\"0.99\"}} {:.6}\n",
119                h.p99_ns as f64 / 1e9
120            ));
121        }
122
123        out
124    }
125
126    /// Build a JSON snapshot for WebSocket push to CLI.
127    pub fn to_ws_snapshot(&self) -> serde_json::Value {
128        serde_json::json!({
129            "gpu_utilization": self.gpu_memory.iter().map(|(k, v)| {
130                serde_json::json!({
131                    "key": k,
132                    "current_bytes": v.current_bytes,
133                    "peak_bytes": v.peak_bytes,
134                })
135            }).collect::<Vec<_>>(),
136            "cuda_latency_p50_us": self.cuda_latency.values().next()
137                .map(|h| h.p50_ns as f64 / 1000.0).unwrap_or(0.0),
138            "cuda_latency_p99_us": self.cuda_latency.values().next()
139                .map(|h| h.p99_ns as f64 / 1000.0).unwrap_or(0.0),
140            "nccl_allreduce_p50_ms": self.nccl_duration.get("all_reduce")
141                .map(|h| h.p50_ns as f64 / 1e6).unwrap_or(0.0),
142            "nccl_allreduce_p99_ms": self.nccl_duration.get("all_reduce")
143                .map(|h| h.p99_ns as f64 / 1e6).unwrap_or(0.0),
144            "dataloader_wait_p50_ms": self.dataloader_wait.values().next()
145                .map(|h| h.p50_ns as f64 / 1e6).unwrap_or(0.0),
146            "last_update_ms": self.last_update_ms,
147        })
148    }
149}
150
151#[derive(Debug, Default, Clone, Serialize)]
152pub struct GpuMemMetrics {
153    pub current_bytes: u64,
154    pub peak_bytes: u64,
155    pub alloc_count: u64,
156    pub free_count: u64,
157}
158
159#[derive(Debug, Clone, Serialize)]
160pub struct LatencyHistogram {
161    pub count: u64,
162    pub sum_ns: u64,
163    pub min_ns: u64,
164    pub max_ns: u64,
165    pub p50_ns: u64,
166    pub p99_ns: u64,
167    #[serde(skip)]
168    samples: Vec<u64>,
169}
170
171impl Default for LatencyHistogram {
172    fn default() -> Self {
173        Self {
174            count: 0,
175            sum_ns: 0,
176            min_ns: u64::MAX,
177            max_ns: 0,
178            p50_ns: 0,
179            p99_ns: 0,
180            samples: Vec::new(),
181        }
182    }
183}
184
185impl LatencyHistogram {
186    pub fn record(&mut self, value_ns: u64) {
187        self.count += 1;
188        self.sum_ns += value_ns;
189        if value_ns < self.min_ns {
190            self.min_ns = value_ns;
191        }
192        if value_ns > self.max_ns {
193            self.max_ns = value_ns;
194        }
195        self.samples.push(value_ns);
196
197        // Recompute percentiles every 100 samples or on first 10
198        if self.samples.len() <= 10 || self.samples.len().is_multiple_of(100) {
199            self.recompute_percentiles();
200        }
201    }
202
203    fn recompute_percentiles(&mut self) {
204        if self.samples.is_empty() {
205            return;
206        }
207        let mut sorted = self.samples.clone();
208        sorted.sort_unstable();
209        let len = sorted.len();
210        self.p50_ns = sorted[len / 2];
211        self.p99_ns = sorted[((len as f64 * 0.99) as usize).min(len - 1)];
212    }
213
214    pub fn mean_ns(&self) -> f64 {
215        if self.count == 0 {
216            0.0
217        } else {
218            self.sum_ns as f64 / self.count as f64
219        }
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    #[test]
228    fn histogram_percentiles() {
229        let mut h = LatencyHistogram::default();
230        for i in 1..=100 {
231            h.record(i * 1000);
232        }
233        assert_eq!(h.count, 100);
234        assert_eq!(h.min_ns, 1000);
235        assert_eq!(h.max_ns, 100_000);
236        assert!(h.p50_ns > 0);
237        assert!(h.p99_ns >= h.p50_ns);
238    }
239
240    #[test]
241    fn prometheus_format() {
242        let mut m = AggregatedMetrics::default();
243        m.record_gpu_mem(1234, 0, 84934656, 84934656);
244        m.record_cuda_latency(1234, 142000);
245        let prom = m.to_prometheus();
246        assert!(prom.contains("zernel_gpu_memory_used_bytes"));
247        assert!(prom.contains("84934656"));
248    }
249
250    #[test]
251    fn ws_snapshot_is_valid_json() {
252        let m = AggregatedMetrics::default();
253        let snap = m.to_ws_snapshot();
254        assert!(snap.is_object());
255    }
256}