diff --git a/libshpool/src/protocol.rs b/libshpool/src/protocol.rs index 754dca45..b1698302 100644 --- a/libshpool/src/protocol.rs +++ b/libshpool/src/protocol.rs @@ -249,81 +249,87 @@ impl Client { let exit_status = AtomicI32::new(1); thread::scope(|s| { // stdin -> sock - let stdin_to_sock_h = s.spawn(|| -> anyhow::Result<()> { - let _s = span!(Level::INFO, "stdin->sock").entered(); - let mut stdin = std::io::stdin().lock(); - let mut buf = vec![0; consts::BUF_SIZE]; - - loop { - let nread = stdin.read(&mut buf).context("reading stdin from user")?; - if nread == 0 { - continue; - } - debug!("read {} bytes", nread); + let stdin_to_sock_h = thread::Builder::new() + .name("stdin_to_sock".to_string()) + .spawn_scoped(s, || -> anyhow::Result<()> { + let _s = span!(Level::INFO, "stdin->sock").entered(); + let mut stdin = std::io::stdin().lock(); + let mut buf = vec![0; consts::BUF_SIZE]; + + loop { + let nread = stdin.read(&mut buf).context("reading stdin from user")?; + if nread == 0 { + continue; + } + debug!("read {} bytes", nread); - let to_write = &buf[..nread]; - trace!("created to_write='{}'", String::from_utf8_lossy(to_write)); + let to_write = &buf[..nread]; + trace!("created to_write='{}'", String::from_utf8_lossy(to_write)); - write_client_stream.write_all(to_write)?; - write_client_stream.flush().context("flushing client")?; - } - }); + write_client_stream.write_all(to_write)?; + write_client_stream.flush().context("flushing client")?; + } + }) + .unwrap(); // sock -> stdout - let sock_to_stdout_h = s.spawn(|| -> anyhow::Result<()> { - let _s = span!(Level::INFO, "sock->stdout").entered(); - - let mut stdout = std::io::stdout().lock(); - let mut buf = vec![0; consts::BUF_SIZE]; - - loop { - let chunk = match Chunk::read_into(&mut read_client_stream, &mut buf) { - Ok(c) => c, - Err(err) => { - error!("reading chunk: {:?}", err); - return Err(err); + let sock_to_stdout_h = thread::Builder::new() + .name("sock_to_stdout".to_string()) + .spawn_scoped(s, || -> anyhow::Result<()> { + let _s = span!(Level::INFO, "sock->stdout").entered(); + + let mut stdout = std::io::stdout().lock(); + let mut buf = vec![0; consts::BUF_SIZE]; + + loop { + let chunk = match Chunk::read_into(&mut read_client_stream, &mut buf) { + Ok(c) => c, + Err(err) => { + error!("reading chunk: {:?}", err); + return Err(err); + } + }; + + if !chunk.buf.is_empty() { + debug!( + "chunk='{}' kind={:?} len={}", + String::from_utf8_lossy(chunk.buf), + chunk.kind, + chunk.buf.len() + ); } - }; - - if !chunk.buf.is_empty() { - debug!( - "chunk='{}' kind={:?} len={}", - String::from_utf8_lossy(chunk.buf), - chunk.kind, - chunk.buf.len() - ); - } - match chunk.kind { - ChunkKind::Heartbeat => { - trace!("got heartbeat chunk"); - } - ChunkKind::Data => { - stdout.write_all(chunk.buf).context("writing chunk to stdout")?; - - if let Err(e) = stdout.flush() { - if e.kind() == std::io::ErrorKind::WouldBlock { - // If the fd is busy, we are likely just getting - // flooded with output and don't need to worry about - // flushing every last byte. Flushing is really - // about interactive situations where we want to - // see echoed bytes immediately. - continue; + match chunk.kind { + ChunkKind::Heartbeat => { + trace!("got heartbeat chunk"); + } + ChunkKind::Data => { + stdout.write_all(chunk.buf).context("writing chunk to stdout")?; + + if let Err(e) = stdout.flush() { + if e.kind() == std::io::ErrorKind::WouldBlock { + // If the fd is busy, we are likely just getting + // flooded with output and don't need to worry about + // flushing every last byte. Flushing is really + // about interactive situations where we want to + // see echoed bytes immediately. + continue; + } } + debug!("flushed stdout"); + } + ChunkKind::ExitStatus => { + let mut status_reader = io::Cursor::new(chunk.buf); + let stat = status_reader + .read_i32::() + .context("reading exit status from exit status chunk")?; + info!("got exit status frame (status={})", stat); + exit_status.store(stat, Ordering::Release); } - debug!("flushed stdout"); - } - ChunkKind::ExitStatus => { - let mut status_reader = io::Cursor::new(chunk.buf); - let stat = status_reader - .read_i32::() - .context("reading exit status from exit status chunk")?; - info!("got exit status frame (status={})", stat); - exit_status.store(stat, Ordering::Release); } } - } - }); + }) + .unwrap(); loop { let mut nfinished_threads = 0;