zernel_ebpf/consumers/
nccl.rs1use serde::Serialize;
4
5#[derive(Debug, Clone, Serialize)]
8#[repr(C)]
9pub struct NcclEvent {
10 pub pid: u32,
11 pub op: u8,
12 pub _pad: [u8; 3],
13 pub size_bytes: u64,
14 pub start_ns: u64,
15 pub duration_ns: u64,
16 pub rank: u32,
17 pub num_ranks: u32,
18}
19
20#[derive(Debug, Serialize)]
21pub enum NcclOp {
22 AllReduce,
23 Broadcast,
24 AllGather,
25 ReduceScatter,
26 Unknown(u8),
27}
28
29impl From<u8> for NcclOp {
30 fn from(v: u8) -> Self {
31 match v {
32 0 => Self::AllReduce,
33 1 => Self::Broadcast,
34 2 => Self::AllGather,
35 3 => Self::ReduceScatter,
36 other => Self::Unknown(other),
37 }
38 }
39}
40
41impl NcclEvent {
42 pub fn op_name(&self) -> &'static str {
43 match self.op {
44 0 => "all_reduce",
45 1 => "broadcast",
46 2 => "all_gather",
47 3 => "reduce_scatter",
48 _ => "unknown",
49 }
50 }
51}
52
53pub struct NcclConsumer;
54
55impl NcclConsumer {
56 pub fn new() -> Self {
57 Self
58 }
59
60 pub fn process_event(&self, raw: &[u8]) -> Option<NcclEvent> {
61 if raw.len() < std::mem::size_of::<NcclEvent>() {
62 return None;
63 }
64 let event = unsafe { &*(raw.as_ptr() as *const NcclEvent) };
65 Some(event.clone())
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use super::*;
72
73 #[test]
74 fn nccl_op_names() {
75 let event = NcclEvent {
76 pid: 1,
77 op: 0,
78 _pad: [0; 3],
79 size_bytes: 4096,
80 start_ns: 0,
81 duration_ns: 34_000_000,
82 rank: 0,
83 num_ranks: 8,
84 };
85 assert_eq!(event.op_name(), "all_reduce");
86 }
87}