zernel/commands/
migrate.rs1use anyhow::{Context, Result};
9use clap::Subcommand;
10use std::process::Command;
11
12#[derive(Subcommand)]
13pub enum MigrateCommands {
14 Job {
16 pid: u32,
18 #[arg(long)]
20 from: u32,
21 #[arg(long)]
23 to: u32,
24 },
25 List,
27 Checkpoint {
29 pid: u32,
31 #[arg(long, default_value = "/tmp/zernel-checkpoint")]
33 output: String,
34 },
35 Resume {
37 path: String,
39 #[arg(long, default_value = "0")]
41 gpu: u32,
42 },
43}
44
45pub async fn run(cmd: MigrateCommands) -> Result<()> {
46 match cmd {
47 MigrateCommands::Job { pid, from, to } => {
48 println!("Zernel GPU Migration");
49 println!("{}", "=".repeat(50));
50 println!(" PID: {pid}");
51 println!(" From: GPU {from}");
52 println!(" To: GPU {to}");
53 println!();
54
55 #[cfg(unix)]
57 {
58 let proc_path = format!("/proc/{pid}");
59 if !std::path::Path::new(&proc_path).exists() {
60 anyhow::bail!("process {pid} not found");
61 }
62 }
63
64 println!("[1/4] Signaling process to checkpoint...");
66 #[cfg(unix)]
67 unsafe {
68 libc::kill(pid as i32, libc::SIGUSR1);
69 }
70 println!(" Sent SIGUSR1 to PID {pid}");
71 println!(" (PyTorch Lightning, DeepSpeed, and FSDP handle SIGUSR1 for checkpointing)");
72
73 println!("[2/4] Waiting for checkpoint to complete...");
75 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
76
77 println!("[3/4] Setting CUDA_VISIBLE_DEVICES={to} for PID {pid}...");
79 println!(" Note: CUDA_VISIBLE_DEVICES cannot be changed for a running process.");
80 println!(" The job must be restarted with the new GPU assignment.");
81 println!();
82
83 println!("[4/4] Migration guidance:");
85 println!(" 1. The checkpoint signal has been sent.");
86 println!(" 2. Wait for the training loop to save a checkpoint.");
87 println!(" 3. Stop the process: kill {pid}");
88 println!(" 4. Restart with new GPU:");
89 println!(" CUDA_VISIBLE_DEVICES={to} python train.py --resume-from-checkpoint");
90 println!();
91 println!(" For PyTorch Lightning:");
92 println!(" CUDA_VISIBLE_DEVICES={to} python train.py --ckpt_path=last");
93 println!();
94 println!(" For DeepSpeed:");
95 println!(" CUDA_VISIBLE_DEVICES={to} deepspeed train.py --deepspeed_config ds.json --resume");
96 }
97
98 MigrateCommands::List => {
99 println!("Migration-Capable Jobs (CUDA processes)");
100 println!("{}", "=".repeat(60));
101
102 let output = Command::new("nvidia-smi")
103 .args([
104 "--query-compute-apps",
105 "pid,process_name,gpu_uuid,used_gpu_memory",
106 "--format=csv,noheader,nounits",
107 ])
108 .output()
109 .with_context(|| "nvidia-smi not found")?;
110
111 let stdout = String::from_utf8_lossy(&output.stdout);
112
113 if stdout.trim().is_empty() {
114 println!(" No CUDA processes running.");
115 return Ok(());
116 }
117
118 println!(
119 "{:<8} {:<30} {:>10} {:>8}",
120 "PID", "Process", "GPU Mem", "GPU"
121 );
122 println!("{}", "-".repeat(60));
123
124 let uuid_map = Command::new("nvidia-smi")
126 .args(["--query-gpu=index,uuid", "--format=csv,noheader"])
127 .output()
128 .ok();
129
130 let mut uuid_to_idx = std::collections::HashMap::new();
131 if let Some(ref o) = uuid_map {
132 for line in String::from_utf8_lossy(&o.stdout).lines() {
133 let f: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
134 if f.len() >= 2 {
135 uuid_to_idx.insert(f[1].to_string(), f[0].to_string());
136 }
137 }
138 }
139
140 for line in stdout.lines() {
141 let f: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
142 if f.len() >= 4 {
143 let gpu_idx = uuid_to_idx.get(f[2]).cloned().unwrap_or_else(|| "?".into());
144 println!("{:<8} {:<30} {:>7} MB {:>8}", f[0], f[1], f[3], gpu_idx);
145 }
146 }
147
148 println!();
149 println!("Migrate: zernel migrate job <PID> --from <gpu> --to <gpu>");
150 }
151
152 MigrateCommands::Checkpoint { pid, output } => {
153 println!("Checkpointing PID {pid} to {output}...");
154 std::fs::create_dir_all(&output)?;
155
156 #[cfg(unix)]
157 unsafe {
158 libc::kill(pid as i32, libc::SIGUSR1);
159 }
160
161 println!(" SIGUSR1 sent. Waiting for framework to save checkpoint...");
162 println!(" Check framework logs for checkpoint save confirmation.");
163 println!();
164 println!(" Expected checkpoint location depends on framework:");
165 println!(" PyTorch Lightning: lightning_logs/");
166 println!(" DeepSpeed: output_dir/checkpoint-*/");
167 println!(" HuggingFace Trainer: output_dir/checkpoint-*/");
168 }
169
170 MigrateCommands::Resume { path, gpu } => {
171 println!("Resuming from checkpoint: {path}");
172 println!(" Target GPU: {gpu}");
173 println!();
174
175 if !std::path::Path::new(&path).exists() {
176 anyhow::bail!("checkpoint path not found: {path}");
177 }
178
179 println!(" Restart your training with:");
180 println!(" CUDA_VISIBLE_DEVICES={gpu} python train.py --resume {path}");
181 }
182 }
183 Ok(())
184}