zernel_ebpf/
main.rs

1// Copyright (C) 2026 Dyber, Inc. — GPL-2.0
2//
3// zerneld — Zernel eBPF Observability Daemon
4//
5// Allow dead code: probe consumers and event types are public API
6// surface consumed when BPF probes are wired up on Linux.
7#![allow(dead_code)]
8
9// Loads eBPF probes, consumes events from ring buffers, aggregates metrics,
10// and exposes them via Prometheus endpoint and WebSocket for the CLI IDE.
11
12mod aggregation;
13mod alerts;
14mod consumers;
15mod fallback;
16mod gpu_watchdog;
17mod loader;
18mod metrics_server;
19mod nccl_priority;
20mod power;
21mod prefetch;
22mod simulator;
23mod websocket_server;
24
25use aggregation::AggregatedMetrics;
26use anyhow::Result;
27use clap::Parser;
28use std::sync::Arc;
29use tokio::sync::RwLock;
30use tracing::info;
31
32#[derive(Parser)]
33#[command(name = "zerneld")]
34#[command(about = "Zernel eBPF Observability Daemon")]
35#[command(version)]
36struct Args {
37    /// Prometheus metrics HTTP port
38    #[arg(long, default_value = "9091", env = "ZERNEL_METRICS_PORT")]
39    metrics_port: u16,
40
41    /// WebSocket telemetry stream port
42    #[arg(long, default_value = "9092", env = "ZERNEL_WS_PORT")]
43    ws_port: u16,
44
45    /// WebSocket push interval in milliseconds
46    #[arg(long, default_value = "1000", env = "ZERNEL_PUSH_INTERVAL_MS")]
47    push_interval_ms: u64,
48
49    /// Force simulated telemetry (ignore BPF and nvidia-smi)
50    #[arg(long)]
51    simulate: bool,
52}
53
54/// Telemetry source selection result.
55#[derive(Debug, Clone, Copy)]
56enum TelemetrySource {
57    /// Real BPF ring buffer events from kernel probes.
58    Bpf,
59    /// nvidia-smi + /proc polling (no BPF, but real GPU data).
60    NvidiaSmi,
61    /// Fully simulated data for development/demos.
62    Simulator,
63}
64
65impl std::fmt::Display for TelemetrySource {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        match self {
68            Self::Bpf => write!(f, "BPF"),
69            Self::NvidiaSmi => write!(f, "nvidia-smi"),
70            Self::Simulator => write!(f, "simulator"),
71        }
72    }
73}
74
75#[tokio::main]
76async fn main() -> Result<()> {
77    tracing_subscriber::fmt()
78        .with_env_filter(std::env::var("ZERNEL_LOG").unwrap_or_else(|_| "zernel_ebpf=info".into()))
79        .init();
80
81    let args = Args::parse();
82
83    info!("zerneld v{}", env!("CARGO_PKG_VERSION"));
84
85    // ============================================================
86    // Three-tier telemetry source selection:
87    //   1. BPF ring buffers (Linux 6.12+ with root)
88    //   2. nvidia-smi polling (any system with NVIDIA GPU)
89    //   3. Simulator (development/demo)
90    // ============================================================
91
92    let source = if args.simulate {
93        TelemetrySource::Simulator
94    } else {
95        // Try BPF first
96        let load_result = loader::load_all_probes().unwrap_or_else(|e| {
97            tracing::warn!("BPF probe loading failed: {e}");
98            loader::LoadResult {
99                status: loader::ProbeStatus::none(),
100            }
101        });
102
103        if load_result.status.active_count() > 0 {
104            info!(
105                active = load_result.status.active_count(),
106                "BPF probes loaded"
107            );
108            TelemetrySource::Bpf
109        } else if fallback::nvidia_smi_available() {
110            info!("BPF unavailable, using nvidia-smi fallback for real GPU metrics");
111            TelemetrySource::NvidiaSmi
112        } else {
113            info!("no BPF or nvidia-smi available, using simulator");
114            TelemetrySource::Simulator
115        }
116    };
117
118    info!(source = %source, "telemetry source selected");
119
120    // Shared metrics state
121    let metrics = Arc::new(RwLock::new(AggregatedMetrics::default()));
122
123    // Start the appropriate telemetry provider
124    match source {
125        TelemetrySource::Bpf => {
126            // TODO: When full BPF skeleton loading is wired up, spawn ring
127            // buffer polling tasks here that read events and feed into metrics.
128            // For now, BPF is detected but we fall through to fallback behavior.
129            info!("BPF ring buffer polling active");
130            let fb_metrics = Arc::clone(&metrics);
131            tokio::spawn(async move {
132                fallback::run_fallback(fb_metrics, 1000).await;
133            });
134        }
135        TelemetrySource::NvidiaSmi => {
136            let fb_metrics = Arc::clone(&metrics);
137            tokio::spawn(async move {
138                fallback::run_fallback(fb_metrics, 1000).await;
139            });
140        }
141        TelemetrySource::Simulator => {
142            let sim_metrics = Arc::clone(&metrics);
143            tokio::spawn(async move {
144                simulator::run_simulator(sim_metrics, 500).await;
145            });
146        }
147    }
148
149    // Alert engine
150    let alert_metrics = Arc::clone(&metrics);
151    let alert_engine = alerts::AlertEngine::new(vec![alerts::AlertRule {
152        name: "gpu_oom_warning".into(),
153        metric: "gpu_memory_used_pct".into(),
154        threshold: 95.0,
155        comparison: alerts::Comparison::GreaterThan,
156        action: alerts::AlertAction::Log,
157    }]);
158    tokio::spawn(async move {
159        let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
160        loop {
161            interval.tick().await;
162            let m = alert_metrics.read().await;
163            for gpu in m.gpu_memory.values() {
164                if gpu.peak_bytes > 0 {
165                    let used_pct = gpu.current_bytes as f64 / gpu.peak_bytes as f64 * 100.0;
166                    alert_engine.evaluate("gpu_memory_used_pct", used_pct);
167                }
168            }
169        }
170    });
171
172    // Start servers
173    let metrics_srv = metrics_server::MetricsServer::new(Arc::clone(&metrics), args.metrics_port);
174    let ws_srv = websocket_server::WebSocketServer::new(
175        Arc::clone(&metrics),
176        args.ws_port,
177        args.push_interval_ms,
178    );
179
180    info!(
181        metrics_port = args.metrics_port,
182        ws_port = args.ws_port,
183        source = %source,
184        "zerneld ready"
185    );
186
187    tokio::select! {
188        res = metrics_srv.serve() => {
189            if let Err(e) = res {
190                tracing::error!("metrics server error: {e}");
191            }
192        }
193        res = ws_srv.serve() => {
194            if let Err(e) = res {
195                tracing::error!("WebSocket server error: {e}");
196            }
197        }
198        _ = tokio::signal::ctrl_c() => {
199            info!("shutting down");
200        }
201    }
202
203    Ok(())
204}