nextest_runner/test_command/
imp.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::{
5    errors::{ChildFdError, ErrorList},
6    test_output::{CaptureStrategy, ChildExecutionOutput, ChildOutput, ChildSplitOutput},
7};
8use bytes::BytesMut;
9use std::{io, process::Stdio, sync::Arc};
10use tokio::{
11    fs::File,
12    io::{AsyncBufReadExt, AsyncRead, BufReader},
13    process::{Child as TokioChild, ChildStderr, ChildStdout},
14};
15
16cfg_if::cfg_if! {
17    if #[cfg(unix)] {
18        #[path = "unix.rs"]
19        mod unix;
20        use unix as os;
21    } else if #[cfg(windows)] {
22        #[path = "windows.rs"]
23        mod windows;
24        use windows as os;
25    } else {
26        compile_error!("unsupported target platform");
27    }
28}
29
30/// A spawned child process along with its file descriptors.
31pub(crate) struct Child {
32    pub child: TokioChild,
33    pub child_fds: ChildFds,
34}
35
36pub(super) fn spawn(
37    mut cmd: std::process::Command,
38    strategy: CaptureStrategy,
39) -> std::io::Result<Child> {
40    cmd.stdin(Stdio::null());
41
42    let state: Option<os::State> = match strategy {
43        CaptureStrategy::None => None,
44        CaptureStrategy::Split => {
45            cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
46            None
47        }
48        CaptureStrategy::Combined => Some(os::setup_io(&mut cmd)?),
49    };
50
51    let mut cmd: tokio::process::Command = cmd.into();
52    let mut child = cmd.spawn()?;
53
54    let output = match strategy {
55        CaptureStrategy::None => ChildFds::new_split(None, None),
56        CaptureStrategy::Split => {
57            let stdout = child.stdout.take().expect("stdout was set");
58            let stderr = child.stderr.take().expect("stderr was set");
59
60            ChildFds::new_split(Some(stdout), Some(stderr))
61        }
62        CaptureStrategy::Combined => {
63            ChildFds::new_combined(std::fs::File::from(state.expect("state was set").ours).into())
64        }
65    };
66
67    Ok(Child {
68        child,
69        child_fds: output,
70    })
71}
72
73/// The size of each buffered reader's buffer, and the size at which we grow the combined buffer.
74///
75/// This size is not totally arbitrary, but rather the (normal) page size on most systems.
76const CHUNK_SIZE: usize = 4 * 1024;
77
78/// A `BufReader` over an `AsyncRead` that tracks the state of the reader and
79/// whether it is done.
80pub(crate) struct FusedBufReader<R> {
81    reader: BufReader<R>,
82    done: bool,
83}
84
85impl<R: AsyncRead + Unpin> FusedBufReader<R> {
86    pub(crate) fn new(reader: R) -> Self {
87        Self {
88            reader: BufReader::with_capacity(CHUNK_SIZE, reader),
89            done: false,
90        }
91    }
92
93    pub(crate) async fn fill_buf(&mut self, acc: &mut BytesMut) -> Result<(), io::Error> {
94        if self.done {
95            return Ok(());
96        }
97
98        let res = self.reader.fill_buf().await;
99        match res {
100            Ok(buf) => {
101                acc.extend_from_slice(buf);
102                if buf.is_empty() {
103                    self.done = true;
104                }
105                let len = buf.len();
106                self.reader.consume(len);
107                Ok(())
108            }
109            Err(error) => {
110                self.done = true;
111                Err(error)
112            }
113        }
114    }
115
116    pub(crate) fn is_done(&self) -> bool {
117        self.done
118    }
119}
120
121/// A version of [`FusedBufReader::fill_buf`] that works with an `Option<FusedBufReader>`.
122async fn fill_buf_opt<R: AsyncRead + Unpin>(
123    reader: Option<&mut FusedBufReader<R>>,
124    acc: Option<&mut BytesMut>,
125) -> Result<(), io::Error> {
126    if let Some(reader) = reader {
127        let acc = acc.expect("reader and acc must match");
128        reader.fill_buf(acc).await
129    } else {
130        Ok(())
131    }
132}
133
134/// A version of [`FusedBufReader::is_done`] that works with an `Option<FusedBufReader>`.
135fn is_done_opt<R: AsyncRead + Unpin>(reader: &Option<FusedBufReader<R>>) -> bool {
136    reader.as_ref().is_none_or(|r| r.is_done())
137}
138
139/// Output and result accumulator for a child process.
140pub(crate) struct ChildAccumulator {
141    // TODO: it would be nice to also store the tokio::process::Child here, and
142    // for `fill_buf` to select over it.
143    pub(crate) fds: ChildFds,
144    pub(crate) output: ChildOutputMut,
145    pub(crate) errors: Vec<ChildFdError>,
146}
147
148impl ChildAccumulator {
149    pub(crate) fn new(fds: ChildFds) -> Self {
150        let output = fds.make_acc();
151        Self {
152            fds,
153            output,
154            errors: Vec::new(),
155        }
156    }
157
158    pub(crate) async fn fill_buf(&mut self) {
159        let res = self.fds.fill_buf(&mut self.output).await;
160        if let Err(error) = res {
161            self.errors.push(error);
162        }
163    }
164
165    pub(crate) fn snapshot_in_progress(
166        &self,
167        error_description: &'static str,
168    ) -> ChildExecutionOutput {
169        ChildExecutionOutput::Output {
170            result: None,
171            output: self.output.snapshot(),
172            errors: ErrorList::new(error_description, self.errors.clone()),
173        }
174    }
175}
176
177/// File descriptors (or Windows handles) for the child process.
178pub(crate) enum ChildFds {
179    /// Separate stdout and stderr, or they're not captured.
180    Split {
181        stdout: Option<FusedBufReader<ChildStdout>>,
182        stderr: Option<FusedBufReader<ChildStderr>>,
183    },
184
185    /// Combined stdout and stderr.
186    Combined { combined: FusedBufReader<File> },
187}
188
189impl ChildFds {
190    pub(crate) fn new_split(stdout: Option<ChildStdout>, stderr: Option<ChildStderr>) -> Self {
191        Self::Split {
192            stdout: stdout.map(FusedBufReader::new),
193            stderr: stderr.map(FusedBufReader::new),
194        }
195    }
196
197    pub(crate) fn new_combined(file: File) -> Self {
198        Self::Combined {
199            combined: FusedBufReader::new(file),
200        }
201    }
202
203    pub(crate) fn is_done(&self) -> bool {
204        match self {
205            Self::Split { stdout, stderr } => is_done_opt(stdout) && is_done_opt(stderr),
206            Self::Combined { combined } => combined.is_done(),
207        }
208    }
209}
210
211impl ChildFds {
212    /// Makes an empty `ChildOutput` with the appropriate buffers for this `ChildFds`.
213    pub(crate) fn make_acc(&self) -> ChildOutputMut {
214        match self {
215            Self::Split { stdout, stderr } => ChildOutputMut::Split {
216                stdout: stdout.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
217                stderr: stderr.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
218            },
219            Self::Combined { .. } => ChildOutputMut::Combined(BytesMut::with_capacity(CHUNK_SIZE)),
220        }
221    }
222
223    /// Fills one of the buffers in `acc` with available data from the child process.
224    ///
225    /// This is a single step in the process of collecting the output of a child process. This
226    /// operation is cancel-safe, since the underlying [`AsyncBufReadExt::fill_buf`] operation is
227    /// cancel-safe.
228    ///
229    /// We follow this "externalized progress" pattern rather than having the collect output futures
230    /// own the data they're collecting, to enable future improvements where we can dump
231    /// currently-captured output to the terminal.
232    pub(crate) async fn fill_buf(&mut self, acc: &mut ChildOutputMut) -> Result<(), ChildFdError> {
233        match self {
234            Self::Split { stdout, stderr } => {
235                let (stdout_acc, stderr_acc) = acc.as_split_mut();
236                // Wait until either of these make progress.
237                tokio::select! {
238                    res = fill_buf_opt(stdout.as_mut(), stdout_acc), if !is_done_opt(stdout) => {
239                        res.map_err(|error| ChildFdError::ReadStdout(Arc::new(error)))
240                    }
241                    res = fill_buf_opt(stderr.as_mut(), stderr_acc), if !is_done_opt(stderr) => {
242                        res.map_err(|error| ChildFdError::ReadStderr(Arc::new(error)))
243                    }
244                    // If both are done, do nothing.
245                    else => {
246                        Ok(())
247                    }
248                }
249            }
250            Self::Combined { combined } => {
251                if !combined.is_done() {
252                    combined
253                        .fill_buf(acc.as_combined_mut())
254                        .await
255                        .map_err(|error| ChildFdError::ReadCombined(Arc::new(error)))
256                } else {
257                    Ok(())
258                }
259            }
260        }
261    }
262}
263
264/// The output of a child process that's currently being collected.
265pub(crate) enum ChildOutputMut {
266    /// Separate stdout and stderr (`None` if not captured).
267    Split {
268        stdout: Option<BytesMut>,
269        stderr: Option<BytesMut>,
270    },
271    /// Combined stdout and stderr.
272    Combined(BytesMut),
273}
274
275impl ChildOutputMut {
276    fn as_split_mut(&mut self) -> (Option<&mut BytesMut>, Option<&mut BytesMut>) {
277        match self {
278            Self::Split { stdout, stderr } => (stdout.as_mut(), stderr.as_mut()),
279            _ => panic!("ChildOutput is not split"),
280        }
281    }
282
283    fn as_combined_mut(&mut self) -> &mut BytesMut {
284        match self {
285            Self::Combined(combined) => combined,
286            _ => panic!("ChildOutput is not combined"),
287        }
288    }
289
290    /// Makes a snapshot of the current output, returning a [`TestOutput`].
291    ///
292    /// This requires cloning the output so it's more expensive than [`Self::freeze`].
293    pub(crate) fn snapshot(&self) -> ChildOutput {
294        match self {
295            Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
296                stdout: stdout.as_ref().map(|x| x.clone().freeze().into()),
297                stderr: stderr.as_ref().map(|x| x.clone().freeze().into()),
298            }),
299            Self::Combined(combined) => ChildOutput::Combined {
300                output: combined.clone().freeze().into(),
301            },
302        }
303    }
304
305    /// Marks the collection as done, returning a `TestOutput`.
306    pub(crate) fn freeze(self) -> ChildOutput {
307        match self {
308            Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
309                stdout: stdout.map(|x| x.freeze().into()),
310                stderr: stderr.map(|x| x.freeze().into()),
311            }),
312            Self::Combined(combined) => ChildOutput::Combined {
313                output: combined.freeze().into(),
314            },
315        }
316    }
317}