zernel_ebpf/consumers/
nccl.rs

1// Copyright (C) 2026 Dyber, Inc. — GPL-2.0
2
3use serde::Serialize;
4
5/// NCCL collective operation event.
6/// Layout must match struct zernel_nccl_event in common.h.
7#[derive(Debug, Clone, Serialize)]
8#[repr(C)]
9pub struct NcclEvent {
10    pub pid: u32,
11    pub op: u8,
12    pub _pad: [u8; 3],
13    pub size_bytes: u64,
14    pub start_ns: u64,
15    pub duration_ns: u64,
16    pub rank: u32,
17    pub num_ranks: u32,
18}
19
20#[derive(Debug, Serialize)]
21pub enum NcclOp {
22    AllReduce,
23    Broadcast,
24    AllGather,
25    ReduceScatter,
26    Unknown(u8),
27}
28
29impl From<u8> for NcclOp {
30    fn from(v: u8) -> Self {
31        match v {
32            0 => Self::AllReduce,
33            1 => Self::Broadcast,
34            2 => Self::AllGather,
35            3 => Self::ReduceScatter,
36            other => Self::Unknown(other),
37        }
38    }
39}
40
41impl NcclEvent {
42    pub fn op_name(&self) -> &'static str {
43        match self.op {
44            0 => "all_reduce",
45            1 => "broadcast",
46            2 => "all_gather",
47            3 => "reduce_scatter",
48            _ => "unknown",
49        }
50    }
51}
52
53pub struct NcclConsumer;
54
55impl NcclConsumer {
56    pub fn new() -> Self {
57        Self
58    }
59
60    pub fn process_event(&self, raw: &[u8]) -> Option<NcclEvent> {
61        if raw.len() < std::mem::size_of::<NcclEvent>() {
62            return None;
63        }
64        let event = unsafe { &*(raw.as_ptr() as *const NcclEvent) };
65        Some(event.clone())
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use super::*;
72
73    #[test]
74    fn nccl_op_names() {
75        let event = NcclEvent {
76            pid: 1,
77            op: 0,
78            _pad: [0; 3],
79            size_bytes: 4096,
80            start_ns: 0,
81            duration_ns: 34_000_000,
82            rank: 0,
83            num_ranks: 8,
84        };
85        assert_eq!(event.op_name(), "all_reduce");
86    }
87}