sup/src/run.rs

169 lines
4.2 KiB

use std::boxed::Box;
use std::sync::mpsc::TryRecvError;
use colored::Colorize;
use log::{error, info};
use mio::{Events, Poll, PollOpt, Ready, Token};
use mio_child_process::{ProcessEvent, StdioChannel};
use signal_hook::iterator::Signals;
use crate::proc::Proc;
use crate::result::Result;
/// check for holds and start the loop
/// then do stuffs?
pub fn run(procs: Vec<Proc>, holds: Option<i8>, poll: Poll) -> Result<Vec<Proc>> {
if holds.unwrap_or(0) < 1 {
error!("no holds configured");
return Ok(procs);
}
match Signals::new(&[signal_hook::SIGINT]) {
Ok(signals) => {
poll.register(
&signals,
Token(std::usize::MAX - 1),
Ready::readable(),
PollOpt::edge(),
)?;
Ok(run_loop(procs, holds, poll, signals))
}
Err(e) => {
error!("unable to register signals: {}", e);
Err(Box::new(e))
}
}
}
/// restart an App, return the new Proc or report error
fn restart_app(proc: &Proc) -> Result<Proc> {
let app = proc.app.clone();
let name = proc.app.name.clone();
info!("[{}] restarting", name.yellow());
match Proc::start(app) {
Ok(p) => Ok(p),
Err(e) => {
error!("[{}] error restarting: {:?}", name.yellow(), e);
Err(e)
}
}
}
fn find_proc_by_token(token: Token, procs: &[Proc]) -> Option<usize> {
for (counter, proc) in procs.iter().enumerate() {
if proc.token == token {
return Some(counter);
}
}
None
}
fn process_event_data(channel: StdioChannel, data: String, app_name: &str) {
let c = match channel {
StdioChannel::Stdout => "stdout",
StdioChannel::Stderr => "stderr",
};
info!("[{}] {}: {}", app_name, c, data.trim_end());
}
/// run the main loop until out of holds
fn run_loop(procs: Vec<Proc>, holds: Option<i8>, poll: Poll, signals: Signals) -> Vec<Proc> {
let mut procs = procs;
let mut holds = holds;
let mut events = Events::with_capacity(1024);
while holds > Some(0) {
poll.poll(&mut events, None).unwrap();
for event in events.iter() {
if event.token() == Token(std::usize::MAX - 1) {
for signal in signals.pending() {
match signal {
signal_hook::SIGINT => info!("received SIGINT"),
s => info!("got signal {}", s),
};
}
} else if let Some(i) = find_proc_by_token(event.token(), &procs) {
let mut proc = procs.remove(i);
match proc.process.try_recv() {
// got something on stdout or stderr
Ok(ProcessEvent::Data(channel, data)) => {
process_event_data(channel, data, &proc.app.name);
procs.push(proc);
}
// error starting or closing child process
Ok(ProcessEvent::CommandError(e)) => {
error!("[{}] command error: {:?}", &proc.app.name.yellow(), e)
}
// error reading stdout or stderr
Ok(ProcessEvent::IoError(channel, e)) => {
error!(
"[{}] io error on {:?}: {:?}",
&proc.app.name.yellow(),
channel,
e
)
}
// error doing utf8 translation
Ok(ProcessEvent::Utf8Error(channel, e)) => {
error!(
"[{}] utf8 error on {:?}: {:?}",
&proc.app.name.yellow(),
channel,
e
)
}
// process exited
Ok(ProcessEvent::Exit(status)) => {
let hold = proc.app.hold;
if let Some(code) = status.code() {
if code == 0 {
info!("[{}] exited successfully", &proc.app.name.yellow());
if proc.app.restart_on_failure {
if let Ok(p) = restart_app(&proc) {
poll.register(&p.process, p.token, Ready::all(), PollOpt::edge())
.unwrap();
procs.push(p);
} else if hold {
holds = Some(holds.unwrap_or(0) - 1);
}
}
} else {
info!("[{}] exited unsuccessfully with code {}", &proc.app.name.yellow(), code);
}
} else {
info!("[{}] was terminated", &proc.app.name.yellow());
}
if proc.app.check_restart(status) {
if let Ok(p) = restart_app(&proc) {
poll.register(&p.process, p.token, Ready::all(), PollOpt::edge())
.unwrap();
procs.push(p);
} else if hold {
holds = Some(holds.unwrap_or(0) - 1);
}
} else if hold {
holds = Some(holds.unwrap_or(0) - 1);
}
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => {}
};
}
}
}
procs
}