1#![allow(dead_code)]
8
9mod aggregation;
13mod alerts;
14mod consumers;
15mod fallback;
16mod gpu_watchdog;
17mod loader;
18mod metrics_server;
19mod nccl_priority;
20mod power;
21mod prefetch;
22mod simulator;
23mod websocket_server;
24
25use aggregation::AggregatedMetrics;
26use anyhow::Result;
27use clap::Parser;
28use std::sync::Arc;
29use tokio::sync::RwLock;
30use tracing::info;
31
32#[derive(Parser)]
33#[command(name = "zerneld")]
34#[command(about = "Zernel eBPF Observability Daemon")]
35#[command(version)]
36struct Args {
37 #[arg(long, default_value = "9091", env = "ZERNEL_METRICS_PORT")]
39 metrics_port: u16,
40
41 #[arg(long, default_value = "9092", env = "ZERNEL_WS_PORT")]
43 ws_port: u16,
44
45 #[arg(long, default_value = "1000", env = "ZERNEL_PUSH_INTERVAL_MS")]
47 push_interval_ms: u64,
48
49 #[arg(long)]
51 simulate: bool,
52}
53
54#[derive(Debug, Clone, Copy)]
56enum TelemetrySource {
57 Bpf,
59 NvidiaSmi,
61 Simulator,
63}
64
65impl std::fmt::Display for TelemetrySource {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 match self {
68 Self::Bpf => write!(f, "BPF"),
69 Self::NvidiaSmi => write!(f, "nvidia-smi"),
70 Self::Simulator => write!(f, "simulator"),
71 }
72 }
73}
74
75#[tokio::main]
76async fn main() -> Result<()> {
77 tracing_subscriber::fmt()
78 .with_env_filter(std::env::var("ZERNEL_LOG").unwrap_or_else(|_| "zernel_ebpf=info".into()))
79 .init();
80
81 let args = Args::parse();
82
83 info!("zerneld v{}", env!("CARGO_PKG_VERSION"));
84
85 let source = if args.simulate {
93 TelemetrySource::Simulator
94 } else {
95 let load_result = loader::load_all_probes().unwrap_or_else(|e| {
97 tracing::warn!("BPF probe loading failed: {e}");
98 loader::LoadResult {
99 status: loader::ProbeStatus::none(),
100 }
101 });
102
103 if load_result.status.active_count() > 0 {
104 info!(
105 active = load_result.status.active_count(),
106 "BPF probes loaded"
107 );
108 TelemetrySource::Bpf
109 } else if fallback::nvidia_smi_available() {
110 info!("BPF unavailable, using nvidia-smi fallback for real GPU metrics");
111 TelemetrySource::NvidiaSmi
112 } else {
113 info!("no BPF or nvidia-smi available, using simulator");
114 TelemetrySource::Simulator
115 }
116 };
117
118 info!(source = %source, "telemetry source selected");
119
120 let metrics = Arc::new(RwLock::new(AggregatedMetrics::default()));
122
123 match source {
125 TelemetrySource::Bpf => {
126 info!("BPF ring buffer polling active");
130 let fb_metrics = Arc::clone(&metrics);
131 tokio::spawn(async move {
132 fallback::run_fallback(fb_metrics, 1000).await;
133 });
134 }
135 TelemetrySource::NvidiaSmi => {
136 let fb_metrics = Arc::clone(&metrics);
137 tokio::spawn(async move {
138 fallback::run_fallback(fb_metrics, 1000).await;
139 });
140 }
141 TelemetrySource::Simulator => {
142 let sim_metrics = Arc::clone(&metrics);
143 tokio::spawn(async move {
144 simulator::run_simulator(sim_metrics, 500).await;
145 });
146 }
147 }
148
149 let alert_metrics = Arc::clone(&metrics);
151 let alert_engine = alerts::AlertEngine::new(vec![alerts::AlertRule {
152 name: "gpu_oom_warning".into(),
153 metric: "gpu_memory_used_pct".into(),
154 threshold: 95.0,
155 comparison: alerts::Comparison::GreaterThan,
156 action: alerts::AlertAction::Log,
157 }]);
158 tokio::spawn(async move {
159 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
160 loop {
161 interval.tick().await;
162 let m = alert_metrics.read().await;
163 for gpu in m.gpu_memory.values() {
164 if gpu.peak_bytes > 0 {
165 let used_pct = gpu.current_bytes as f64 / gpu.peak_bytes as f64 * 100.0;
166 alert_engine.evaluate("gpu_memory_used_pct", used_pct);
167 }
168 }
169 }
170 });
171
172 let metrics_srv = metrics_server::MetricsServer::new(Arc::clone(&metrics), args.metrics_port);
174 let ws_srv = websocket_server::WebSocketServer::new(
175 Arc::clone(&metrics),
176 args.ws_port,
177 args.push_interval_ms,
178 );
179
180 info!(
181 metrics_port = args.metrics_port,
182 ws_port = args.ws_port,
183 source = %source,
184 "zerneld ready"
185 );
186
187 tokio::select! {
188 res = metrics_srv.serve() => {
189 if let Err(e) = res {
190 tracing::error!("metrics server error: {e}");
191 }
192 }
193 res = ws_srv.serve() => {
194 if let Err(e) = res {
195 tracing::error!("WebSocket server error: {e}");
196 }
197 }
198 _ = tokio::signal::ctrl_c() => {
199 info!("shutting down");
200 }
201 }
202
203 Ok(())
204}