zernel_scheduler/
telemetry.rs1use crate::scheduler::ZernelScheduler;
4use crate::task_state::WorkloadPhase;
5use serde::Serialize;
6
7#[derive(Debug, Serialize)]
9pub struct SchedulerTelemetry {
10 pub total_tracked_tasks: usize,
11 pub ml_tasks: usize,
12 pub decisions_made: u64,
13 pub phase_transitions: u64,
14 pub phase_distribution: PhaseDistribution,
15 pub phase_time_pct: PhaseTimePct,
16 pub numa_nodes: usize,
17 pub total_cpus: usize,
18}
19
20#[derive(Debug, Default, Serialize)]
21pub struct PhaseDistribution {
22 pub data_loading: usize,
23 pub gpu_compute: usize,
24 pub nccl_collective: usize,
25 pub optimizer_step: usize,
26 pub unknown: usize,
27}
28
29#[derive(Debug, Default, Serialize)]
31pub struct PhaseTimePct {
32 pub data_loading: f64,
33 pub gpu_compute: f64,
34 pub nccl_collective: f64,
35 pub optimizer_step: f64,
36 pub unknown: f64,
37}
38
39pub fn export_telemetry(scheduler: &ZernelScheduler) -> SchedulerTelemetry {
41 let states = scheduler.task_states();
42 let mut dist = PhaseDistribution::default();
43
44 let mut ml_count = 0;
45 let mut total_data_loading = 0u64;
46 let mut total_gpu_compute = 0u64;
47 let mut total_nccl = 0u64;
48 let mut total_optimizer = 0u64;
49 let mut total_unknown = 0u64;
50
51 for state in states.values() {
52 if state.is_ml_process {
53 ml_count += 1;
54 total_data_loading += state.phase_time_ns.data_loading_ns;
55 total_gpu_compute += state.phase_time_ns.gpu_compute_ns;
56 total_nccl += state.phase_time_ns.nccl_collective_ns;
57 total_optimizer += state.phase_time_ns.optimizer_step_ns;
58 total_unknown += state.phase_time_ns.unknown_ns;
59 }
60 match state.current_phase {
61 WorkloadPhase::DataLoading => dist.data_loading += 1,
62 WorkloadPhase::GpuCompute => dist.gpu_compute += 1,
63 WorkloadPhase::NcclCollective => dist.nccl_collective += 1,
64 WorkloadPhase::OptimizerStep => dist.optimizer_step += 1,
65 WorkloadPhase::Unknown => dist.unknown += 1,
66 }
67 }
68
69 let total_time =
70 total_data_loading + total_gpu_compute + total_nccl + total_optimizer + total_unknown;
71 let pct = if total_time > 0 {
72 PhaseTimePct {
73 data_loading: total_data_loading as f64 / total_time as f64 * 100.0,
74 gpu_compute: total_gpu_compute as f64 / total_time as f64 * 100.0,
75 nccl_collective: total_nccl as f64 / total_time as f64 * 100.0,
76 optimizer_step: total_optimizer as f64 / total_time as f64 * 100.0,
77 unknown: total_unknown as f64 / total_time as f64 * 100.0,
78 }
79 } else {
80 PhaseTimePct::default()
81 };
82
83 let numa = scheduler.numa_topology();
84
85 SchedulerTelemetry {
86 total_tracked_tasks: states.len(),
87 ml_tasks: ml_count,
88 decisions_made: scheduler.decisions_made,
89 phase_transitions: scheduler.phase_transition_count(),
90 phase_distribution: dist,
91 phase_time_pct: pct,
92 numa_nodes: numa.nodes.len(),
93 total_cpus: numa.total_cpus(),
94 }
95}
96
97pub fn format_prometheus(telem: &SchedulerTelemetry) -> String {
99 let mut out = String::new();
100
101 out.push_str(&format!(
102 "# HELP zernel_scheduler_tasks Total tracked tasks\n\
103 # TYPE zernel_scheduler_tasks gauge\n\
104 zernel_scheduler_tasks{{type=\"total\"}} {}\n\
105 zernel_scheduler_tasks{{type=\"ml\"}} {}\n",
106 telem.total_tracked_tasks, telem.ml_tasks,
107 ));
108
109 out.push_str(&format!(
110 "# HELP zernel_scheduler_decisions_total Total scheduling decisions\n\
111 # TYPE zernel_scheduler_decisions_total counter\n\
112 zernel_scheduler_decisions_total {}\n",
113 telem.decisions_made,
114 ));
115
116 out.push_str(&format!(
117 "# HELP zernel_scheduler_phase_transitions_total Phase transition count\n\
118 # TYPE zernel_scheduler_phase_transitions_total counter\n\
119 zernel_scheduler_phase_transitions_total {}\n",
120 telem.phase_transitions,
121 ));
122
123 out.push_str(&format!(
124 "# HELP zernel_scheduler_phase_tasks Current tasks per phase\n\
125 # TYPE zernel_scheduler_phase_tasks gauge\n\
126 zernel_scheduler_phase_tasks{{phase=\"data_loading\"}} {}\n\
127 zernel_scheduler_phase_tasks{{phase=\"gpu_compute\"}} {}\n\
128 zernel_scheduler_phase_tasks{{phase=\"nccl_collective\"}} {}\n\
129 zernel_scheduler_phase_tasks{{phase=\"optimizer_step\"}} {}\n\
130 zernel_scheduler_phase_tasks{{phase=\"unknown\"}} {}\n",
131 telem.phase_distribution.data_loading,
132 telem.phase_distribution.gpu_compute,
133 telem.phase_distribution.nccl_collective,
134 telem.phase_distribution.optimizer_step,
135 telem.phase_distribution.unknown,
136 ));
137
138 out.push_str(&format!(
139 "# HELP zernel_scheduler_phase_time_pct Aggregate phase time percentage\n\
140 # TYPE zernel_scheduler_phase_time_pct gauge\n\
141 zernel_scheduler_phase_time_pct{{phase=\"data_loading\"}} {:.2}\n\
142 zernel_scheduler_phase_time_pct{{phase=\"gpu_compute\"}} {:.2}\n\
143 zernel_scheduler_phase_time_pct{{phase=\"nccl_collective\"}} {:.2}\n\
144 zernel_scheduler_phase_time_pct{{phase=\"optimizer_step\"}} {:.2}\n\
145 zernel_scheduler_phase_time_pct{{phase=\"unknown\"}} {:.2}\n",
146 telem.phase_time_pct.data_loading,
147 telem.phase_time_pct.gpu_compute,
148 telem.phase_time_pct.nccl_collective,
149 telem.phase_time_pct.optimizer_step,
150 telem.phase_time_pct.unknown,
151 ));
152
153 out
154}