zernel/commands/
run.rs

1// Copyright (C) 2026 Dyber, Inc. — Proprietary
2
3use crate::experiments::store::{Experiment, ExperimentStatus, ExperimentStore};
4use crate::experiments::tracker::{self, MetricExtractor};
5use anyhow::{Context, Result};
6use chrono::Utc;
7use std::collections::HashMap;
8use std::process::Stdio;
9use tokio::io::{AsyncBufReadExt, BufReader};
10
11/// Run a training script with automatic telemetry and experiment tracking.
12pub async fn run(script: &str, args: &[String]) -> Result<()> {
13    let db_path = tracker::experiments_db_path();
14    let store = ExperimentStore::open(&db_path)?;
15    let extractor = MetricExtractor::new();
16
17    let exp_id = tracker::generate_experiment_id();
18    let exp_name = std::path::Path::new(script)
19        .file_stem()
20        .map(|s| s.to_string_lossy().to_string())
21        .unwrap_or_else(|| "unnamed".into());
22
23    let git_commit = std::process::Command::new("git")
24        .args(["rev-parse", "--short", "HEAD"])
25        .output()
26        .ok()
27        .and_then(|o| {
28            if o.status.success() {
29                Some(String::from_utf8_lossy(&o.stdout).trim().to_string())
30            } else {
31                None
32            }
33        });
34
35    let experiment = Experiment {
36        id: exp_id.clone(),
37        name: exp_name.clone(),
38        status: ExperimentStatus::Running,
39        hyperparams: HashMap::new(),
40        metrics: HashMap::new(),
41        created_at: Utc::now(),
42        finished_at: None,
43        git_commit,
44        script: Some(script.to_string()),
45        duration_secs: None,
46    };
47
48    store.insert(&experiment)?;
49
50    println!("Zernel Run");
51    println!("  Experiment: {exp_id}");
52    println!("  Script:     {script}");
53    println!("  Tracking:   enabled");
54    println!();
55
56    let python = detect_python();
57    let start = std::time::Instant::now();
58
59    let mut child = tokio::process::Command::new(&python)
60        .arg(script)
61        .args(args)
62        .stdout(Stdio::piped())
63        .stderr(Stdio::piped())
64        .spawn()
65        .with_context(|| format!("failed to launch: {python} {script}"))?;
66
67    let stdout = child
68        .stdout
69        .take()
70        .ok_or_else(|| anyhow::anyhow!("failed to capture child stdout"))?;
71    let stderr = child
72        .stderr
73        .take()
74        .ok_or_else(|| anyhow::anyhow!("failed to capture child stderr"))?;
75
76    let exp_id_clone = exp_id.clone();
77    let db_path_clone = db_path.clone();
78
79    // Create log directory and file
80    let log_dir = tracker::zernel_dir().join("experiments").join(&exp_id);
81    std::fs::create_dir_all(&log_dir).ok();
82    let log_path = log_dir.join("output.log");
83
84    // Process stdout in a spawned task
85    let stdout_handle = tokio::spawn(async move {
86        let mut latest_metrics: HashMap<String, f64> = HashMap::new();
87        let mut lines_processed = 0u64;
88        let mut reader = BufReader::new(stdout);
89        let mut line = String::new();
90        let mut log_file = std::fs::File::create(&log_path).ok();
91
92        let store = ExperimentStore::open(&db_path_clone).ok();
93
94        loop {
95            line.clear();
96            match reader.read_line(&mut line).await {
97                Ok(0) => break,
98                Ok(_) => {
99                    let trimmed = line.trim_end();
100                    println!("{trimmed}");
101
102                    // Write to log file
103                    if let Some(ref mut f) = log_file {
104                        use std::io::Write;
105                        let _ = writeln!(f, "{trimmed}");
106                    }
107
108                    let extracted = extractor.extract_from_line(trimmed);
109                    if !extracted.is_empty() {
110                        for (k, v) in &extracted {
111                            latest_metrics.insert(k.clone(), *v);
112                        }
113                        lines_processed += 1;
114
115                        if lines_processed.is_multiple_of(10) {
116                            if let Some(ref s) = store {
117                                let _ = s.update_metrics(&exp_id_clone, &latest_metrics);
118                            }
119                        }
120                    }
121                }
122                Err(_) => break,
123            }
124        }
125        latest_metrics
126    });
127
128    // Forward stderr
129    let stderr_handle = tokio::spawn(async move {
130        let mut reader = BufReader::new(stderr);
131        let mut line = String::new();
132        loop {
133            line.clear();
134            match reader.read_line(&mut line).await {
135                Ok(0) => break,
136                Ok(_) => {
137                    eprint!("{}", line);
138                }
139                Err(_) => break,
140            }
141        }
142    });
143
144    let (latest_metrics, _) = tokio::join!(stdout_handle, stderr_handle);
145    let latest_metrics = latest_metrics.unwrap_or_default();
146
147    let status = child.wait().await?;
148    let duration = start.elapsed();
149
150    let final_status = if status.success() {
151        ExperimentStatus::Done
152    } else {
153        ExperimentStatus::Failed
154    };
155
156    store.update_metrics(&exp_id, &latest_metrics)?;
157    store.finish(&exp_id, final_status.clone(), duration.as_secs_f64())?;
158
159    println!();
160    println!("---");
161    println!("  Status:    {final_status}");
162    println!("  Duration:  {:.1}s", duration.as_secs_f64());
163    println!("  Experiment: {exp_id}");
164
165    if !latest_metrics.is_empty() {
166        println!("  Metrics:");
167        let mut sorted: Vec<_> = latest_metrics.iter().collect();
168        sorted.sort_by_key(|(k, _)| (*k).clone());
169        for (k, v) in sorted {
170            println!("    {k}: {v:.4}");
171        }
172    }
173
174    println!();
175    println!("View: zernel exp show {exp_id}");
176
177    if !status.success() {
178        std::process::exit(status.code().unwrap_or(1));
179    }
180
181    Ok(())
182}
183
184fn detect_python() -> String {
185    for candidate in &["python3", "python"] {
186        if std::process::Command::new(candidate)
187            .arg("--version")
188            .output()
189            .is_ok()
190        {
191            return candidate.to_string();
192        }
193    }
194    "python3".to_string()
195}