nextest_runner/test_command/
imp.rsuse crate::{
errors::{ChildFdError, ErrorList},
test_output::{CaptureStrategy, ChildExecutionOutput, ChildOutput, ChildSplitOutput},
};
use bytes::BytesMut;
use std::{io, process::Stdio, sync::Arc};
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncRead, BufReader},
process::{Child as TokioChild, ChildStderr, ChildStdout},
};
cfg_if::cfg_if! {
if #[cfg(unix)] {
#[path = "unix.rs"]
mod unix;
use unix as os;
} else if #[cfg(windows)] {
#[path = "windows.rs"]
mod windows;
use windows as os;
} else {
compile_error!("unsupported target platform");
}
}
pub(crate) struct Child {
pub child: TokioChild,
pub child_fds: ChildFds,
}
pub(super) fn spawn(
mut cmd: std::process::Command,
strategy: CaptureStrategy,
) -> std::io::Result<Child> {
cmd.stdin(Stdio::null());
let state: Option<os::State> = match strategy {
CaptureStrategy::None => None,
CaptureStrategy::Split => {
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
None
}
CaptureStrategy::Combined => Some(os::setup_io(&mut cmd)?),
};
let mut cmd: tokio::process::Command = cmd.into();
let mut child = cmd.spawn()?;
let output = match strategy {
CaptureStrategy::None => ChildFds::new_split(None, None),
CaptureStrategy::Split => {
let stdout = child.stdout.take().expect("stdout was set");
let stderr = child.stderr.take().expect("stderr was set");
ChildFds::new_split(Some(stdout), Some(stderr))
}
CaptureStrategy::Combined => {
ChildFds::new_combined(std::fs::File::from(state.expect("state was set").ours).into())
}
};
Ok(Child {
child,
child_fds: output,
})
}
const CHUNK_SIZE: usize = 4 * 1024;
pub(crate) struct FusedBufReader<R> {
reader: BufReader<R>,
done: bool,
}
impl<R: AsyncRead + Unpin> FusedBufReader<R> {
pub(crate) fn new(reader: R) -> Self {
Self {
reader: BufReader::with_capacity(CHUNK_SIZE, reader),
done: false,
}
}
pub(crate) async fn fill_buf(&mut self, acc: &mut BytesMut) -> Result<(), io::Error> {
if self.done {
return Ok(());
}
let res = self.reader.fill_buf().await;
match res {
Ok(buf) => {
acc.extend_from_slice(buf);
if buf.is_empty() {
self.done = true;
}
let len = buf.len();
self.reader.consume(len);
Ok(())
}
Err(error) => {
self.done = true;
Err(error)
}
}
}
pub(crate) fn is_done(&self) -> bool {
self.done
}
}
async fn fill_buf_opt<R: AsyncRead + Unpin>(
reader: Option<&mut FusedBufReader<R>>,
acc: Option<&mut BytesMut>,
) -> Result<(), io::Error> {
if let Some(reader) = reader {
let acc = acc.expect("reader and acc must match");
reader.fill_buf(acc).await
} else {
Ok(())
}
}
fn is_done_opt<R: AsyncRead + Unpin>(reader: &Option<FusedBufReader<R>>) -> bool {
reader.as_ref().map_or(true, |r| r.is_done())
}
pub(crate) struct ChildAccumulator {
pub(crate) fds: ChildFds,
pub(crate) output: ChildOutputMut,
pub(crate) errors: Vec<ChildFdError>,
}
impl ChildAccumulator {
pub(crate) fn new(fds: ChildFds) -> Self {
let output = fds.make_acc();
Self {
fds,
output,
errors: Vec::new(),
}
}
pub(crate) async fn fill_buf(&mut self) {
let res = self.fds.fill_buf(&mut self.output).await;
if let Err(error) = res {
self.errors.push(error);
}
}
pub(crate) fn snapshot_in_progress(
&self,
error_description: &'static str,
) -> ChildExecutionOutput {
ChildExecutionOutput::Output {
result: None,
output: self.output.snapshot(),
errors: ErrorList::new(error_description, self.errors.clone()),
}
}
}
pub(crate) enum ChildFds {
Split {
stdout: Option<FusedBufReader<ChildStdout>>,
stderr: Option<FusedBufReader<ChildStderr>>,
},
Combined { combined: FusedBufReader<File> },
}
impl ChildFds {
pub(crate) fn new_split(stdout: Option<ChildStdout>, stderr: Option<ChildStderr>) -> Self {
Self::Split {
stdout: stdout.map(FusedBufReader::new),
stderr: stderr.map(FusedBufReader::new),
}
}
pub(crate) fn new_combined(file: File) -> Self {
Self::Combined {
combined: FusedBufReader::new(file),
}
}
pub(crate) fn is_done(&self) -> bool {
match self {
Self::Split { stdout, stderr } => is_done_opt(stdout) && is_done_opt(stderr),
Self::Combined { combined } => combined.is_done(),
}
}
}
impl ChildFds {
pub(crate) fn make_acc(&self) -> ChildOutputMut {
match self {
Self::Split { stdout, stderr } => ChildOutputMut::Split {
stdout: stdout.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
stderr: stderr.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
},
Self::Combined { .. } => ChildOutputMut::Combined(BytesMut::with_capacity(CHUNK_SIZE)),
}
}
pub(crate) async fn fill_buf(&mut self, acc: &mut ChildOutputMut) -> Result<(), ChildFdError> {
match self {
Self::Split { stdout, stderr } => {
let (stdout_acc, stderr_acc) = acc.as_split_mut();
tokio::select! {
res = fill_buf_opt(stdout.as_mut(), stdout_acc), if !is_done_opt(stdout) => {
res.map_err(|error| ChildFdError::ReadStdout(Arc::new(error)))
}
res = fill_buf_opt(stderr.as_mut(), stderr_acc), if !is_done_opt(stderr) => {
res.map_err(|error| ChildFdError::ReadStderr(Arc::new(error)))
}
else => {
Ok(())
}
}
}
Self::Combined { combined } => {
if !combined.is_done() {
combined
.fill_buf(acc.as_combined_mut())
.await
.map_err(|error| ChildFdError::ReadCombined(Arc::new(error)))
} else {
Ok(())
}
}
}
}
}
pub(crate) enum ChildOutputMut {
Split {
stdout: Option<BytesMut>,
stderr: Option<BytesMut>,
},
Combined(BytesMut),
}
impl ChildOutputMut {
fn as_split_mut(&mut self) -> (Option<&mut BytesMut>, Option<&mut BytesMut>) {
match self {
Self::Split { stdout, stderr } => (stdout.as_mut(), stderr.as_mut()),
_ => panic!("ChildOutput is not split"),
}
}
fn as_combined_mut(&mut self) -> &mut BytesMut {
match self {
Self::Combined(combined) => combined,
_ => panic!("ChildOutput is not combined"),
}
}
pub(crate) fn snapshot(&self) -> ChildOutput {
match self {
Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
stdout: stdout.as_ref().map(|x| x.clone().freeze().into()),
stderr: stderr.as_ref().map(|x| x.clone().freeze().into()),
}),
Self::Combined(combined) => ChildOutput::Combined {
output: combined.clone().freeze().into(),
},
}
}
pub(crate) fn freeze(self) -> ChildOutput {
match self {
Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
stdout: stdout.map(|x| x.freeze().into()),
stderr: stderr.map(|x| x.freeze().into()),
}),
Self::Combined(combined) => ChildOutput::Combined {
output: combined.freeze().into(),
},
}
}
}