1use crate::experiments::store::{Experiment, ExperimentStatus, ExperimentStore};
4use crate::experiments::tracker::{self, MetricExtractor};
5use anyhow::{Context, Result};
6use chrono::Utc;
7use std::collections::HashMap;
8use std::process::Stdio;
9use tokio::io::{AsyncBufReadExt, BufReader};
10
11pub async fn run(script: &str, args: &[String]) -> Result<()> {
13 let db_path = tracker::experiments_db_path();
14 let store = ExperimentStore::open(&db_path)?;
15 let extractor = MetricExtractor::new();
16
17 let exp_id = tracker::generate_experiment_id();
18 let exp_name = std::path::Path::new(script)
19 .file_stem()
20 .map(|s| s.to_string_lossy().to_string())
21 .unwrap_or_else(|| "unnamed".into());
22
23 let git_commit = std::process::Command::new("git")
24 .args(["rev-parse", "--short", "HEAD"])
25 .output()
26 .ok()
27 .and_then(|o| {
28 if o.status.success() {
29 Some(String::from_utf8_lossy(&o.stdout).trim().to_string())
30 } else {
31 None
32 }
33 });
34
35 let experiment = Experiment {
36 id: exp_id.clone(),
37 name: exp_name.clone(),
38 status: ExperimentStatus::Running,
39 hyperparams: HashMap::new(),
40 metrics: HashMap::new(),
41 created_at: Utc::now(),
42 finished_at: None,
43 git_commit,
44 script: Some(script.to_string()),
45 duration_secs: None,
46 };
47
48 store.insert(&experiment)?;
49
50 println!("Zernel Run");
51 println!(" Experiment: {exp_id}");
52 println!(" Script: {script}");
53 println!(" Tracking: enabled");
54 println!();
55
56 let python = detect_python();
57 let start = std::time::Instant::now();
58
59 let mut child = tokio::process::Command::new(&python)
60 .arg(script)
61 .args(args)
62 .stdout(Stdio::piped())
63 .stderr(Stdio::piped())
64 .spawn()
65 .with_context(|| format!("failed to launch: {python} {script}"))?;
66
67 let stdout = child
68 .stdout
69 .take()
70 .ok_or_else(|| anyhow::anyhow!("failed to capture child stdout"))?;
71 let stderr = child
72 .stderr
73 .take()
74 .ok_or_else(|| anyhow::anyhow!("failed to capture child stderr"))?;
75
76 let exp_id_clone = exp_id.clone();
77 let db_path_clone = db_path.clone();
78
79 let log_dir = tracker::zernel_dir().join("experiments").join(&exp_id);
81 std::fs::create_dir_all(&log_dir).ok();
82 let log_path = log_dir.join("output.log");
83
84 let stdout_handle = tokio::spawn(async move {
86 let mut latest_metrics: HashMap<String, f64> = HashMap::new();
87 let mut lines_processed = 0u64;
88 let mut reader = BufReader::new(stdout);
89 let mut line = String::new();
90 let mut log_file = std::fs::File::create(&log_path).ok();
91
92 let store = ExperimentStore::open(&db_path_clone).ok();
93
94 loop {
95 line.clear();
96 match reader.read_line(&mut line).await {
97 Ok(0) => break,
98 Ok(_) => {
99 let trimmed = line.trim_end();
100 println!("{trimmed}");
101
102 if let Some(ref mut f) = log_file {
104 use std::io::Write;
105 let _ = writeln!(f, "{trimmed}");
106 }
107
108 let extracted = extractor.extract_from_line(trimmed);
109 if !extracted.is_empty() {
110 for (k, v) in &extracted {
111 latest_metrics.insert(k.clone(), *v);
112 }
113 lines_processed += 1;
114
115 if lines_processed.is_multiple_of(10) {
116 if let Some(ref s) = store {
117 let _ = s.update_metrics(&exp_id_clone, &latest_metrics);
118 }
119 }
120 }
121 }
122 Err(_) => break,
123 }
124 }
125 latest_metrics
126 });
127
128 let stderr_handle = tokio::spawn(async move {
130 let mut reader = BufReader::new(stderr);
131 let mut line = String::new();
132 loop {
133 line.clear();
134 match reader.read_line(&mut line).await {
135 Ok(0) => break,
136 Ok(_) => {
137 eprint!("{}", line);
138 }
139 Err(_) => break,
140 }
141 }
142 });
143
144 let (latest_metrics, _) = tokio::join!(stdout_handle, stderr_handle);
145 let latest_metrics = latest_metrics.unwrap_or_default();
146
147 let status = child.wait().await?;
148 let duration = start.elapsed();
149
150 let final_status = if status.success() {
151 ExperimentStatus::Done
152 } else {
153 ExperimentStatus::Failed
154 };
155
156 store.update_metrics(&exp_id, &latest_metrics)?;
157 store.finish(&exp_id, final_status.clone(), duration.as_secs_f64())?;
158
159 println!();
160 println!("---");
161 println!(" Status: {final_status}");
162 println!(" Duration: {:.1}s", duration.as_secs_f64());
163 println!(" Experiment: {exp_id}");
164
165 if !latest_metrics.is_empty() {
166 println!(" Metrics:");
167 let mut sorted: Vec<_> = latest_metrics.iter().collect();
168 sorted.sort_by_key(|(k, _)| (*k).clone());
169 for (k, v) in sorted {
170 println!(" {k}: {v:.4}");
171 }
172 }
173
174 println!();
175 println!("View: zernel exp show {exp_id}");
176
177 if !status.success() {
178 std::process::exit(status.code().unwrap_or(1));
179 }
180
181 Ok(())
182}
183
184fn detect_python() -> String {
185 for candidate in &["python3", "python"] {
186 if std::process::Command::new(candidate)
187 .arg("--version")
188 .output()
189 .is_ok()
190 {
191 return candidate.to_string();
192 }
193 }
194 "python3".to_string()
195}