zernel_ebpf/
metrics_server.rs

1// Copyright (C) 2026 Dyber, Inc. — GPL-2.0
2
3use crate::aggregation::AggregatedMetrics;
4use anyhow::Result;
5use http_body_util::Full;
6use hyper::body::Bytes;
7use hyper::server::conn::http1;
8use hyper::service::service_fn;
9use hyper::{Request, Response, StatusCode};
10use hyper_util::rt::TokioIo;
11use std::net::SocketAddr;
12use std::sync::Arc;
13use tokio::net::TcpListener;
14use tokio::sync::RwLock;
15use tracing::{error, info};
16
17/// Prometheus-compatible metrics server.
18pub struct MetricsServer {
19    metrics: Arc<RwLock<AggregatedMetrics>>,
20    port: u16,
21}
22
23impl MetricsServer {
24    pub fn new(metrics: Arc<RwLock<AggregatedMetrics>>, port: u16) -> Self {
25        Self { metrics, port }
26    }
27
28    pub async fn serve(&self) -> Result<()> {
29        let addr = SocketAddr::from(([0, 0, 0, 0], self.port));
30        let listener = TcpListener::bind(addr).await?;
31        info!(port = self.port, "Prometheus metrics server listening");
32
33        let metrics = Arc::clone(&self.metrics);
34
35        loop {
36            let (stream, _) = listener.accept().await?;
37            let io = TokioIo::new(stream);
38            let metrics = Arc::clone(&metrics);
39
40            tokio::spawn(async move {
41                let service = service_fn(move |req: Request<hyper::body::Incoming>| {
42                    let metrics = Arc::clone(&metrics);
43                    async move { handle_request(req, metrics).await }
44                });
45
46                if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
47                    error!("HTTP error: {err}");
48                }
49            });
50        }
51    }
52}
53
54async fn handle_request(
55    req: Request<hyper::body::Incoming>,
56    metrics: Arc<RwLock<AggregatedMetrics>>,
57) -> Result<Response<Full<Bytes>>, hyper::Error> {
58    match req.uri().path() {
59        "/metrics" => {
60            let m = metrics.read().await;
61            let body = m.to_prometheus();
62            Ok(Response::builder()
63                .header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
64                .body(Full::new(Bytes::from(body)))
65                .expect("valid response builder"))
66        }
67        "/health" => Ok(Response::new(Full::new(Bytes::from("ok")))),
68        "/json" => {
69            let m = metrics.read().await;
70            let body = serde_json::to_string_pretty(&*m).unwrap_or_default();
71            Ok(Response::builder()
72                .header("Content-Type", "application/json")
73                .body(Full::new(Bytes::from(body)))
74                .expect("valid response builder"))
75        }
76        _ => Ok(Response::builder()
77            .status(StatusCode::NOT_FOUND)
78            .body(Full::new(Bytes::from("not found")))
79            .expect("valid response builder")),
80    }
81}