nextest_runner/test_command/
imp.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::{
5    errors::{ChildFdError, ErrorList},
6    test_output::{CaptureStrategy, ChildExecutionOutput, ChildOutput, ChildSplitOutput},
7};
8use bytes::BytesMut;
9use std::{
10    io::{self, PipeReader},
11    process::Stdio,
12    sync::Arc,
13};
14use tokio::{
15    fs::File,
16    io::{AsyncBufReadExt, AsyncRead, BufReader},
17    process::{Child as TokioChild, ChildStderr, ChildStdout},
18};
19
20cfg_if::cfg_if! {
21    if #[cfg(unix)] {
22        #[path = "unix.rs"]
23        mod unix;
24        use unix as os;
25    } else if #[cfg(windows)] {
26        #[path = "windows.rs"]
27        mod windows;
28        use windows as os;
29    } else {
30        compile_error!("unsupported target platform");
31    }
32}
33
34/// A spawned child process along with its file descriptors.
35pub(crate) struct Child {
36    pub child: TokioChild,
37    pub child_fds: ChildFds,
38}
39
40pub(super) fn spawn(
41    mut cmd: std::process::Command,
42    strategy: CaptureStrategy,
43) -> std::io::Result<Child> {
44    cmd.stdin(Stdio::null());
45
46    let combined_rx: Option<PipeReader> = match strategy {
47        CaptureStrategy::None => None,
48        CaptureStrategy::Split => {
49            cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
50            None
51        }
52        CaptureStrategy::Combined => {
53            // We use std::io::pipe() here rather than tokio::net::unix::pipe()
54            // for a couple of reasons:
55            //
56            // * std::io::pipe has the most up-to-date information about things
57            //   like atomic O_CLOEXEC. In particular, mio-pipe 0.1.1 doesn't do
58            //   O_CLOEXEC on platforms like illumos.
59            // * There's no analog to Tokio's anonymous pipes on Windows, while
60            //   std::io::pipe works on all platforms.
61            let (rx, tx) = std::io::pipe()?;
62            cmd.stdout(tx.try_clone()?).stderr(tx);
63            Some(rx)
64        }
65    };
66
67    let mut cmd: tokio::process::Command = cmd.into();
68    let mut child = cmd.spawn()?;
69
70    let output = match strategy {
71        CaptureStrategy::None => ChildFds::new_split(None, None),
72        CaptureStrategy::Split => {
73            let stdout = child.stdout.take().expect("stdout was set");
74            let stderr = child.stderr.take().expect("stderr was set");
75
76            ChildFds::new_split(Some(stdout), Some(stderr))
77        }
78        CaptureStrategy::Combined => ChildFds::new_combined(
79            os::pipe_reader_to_file(combined_rx.expect("combined_fx was set")).into(),
80        ),
81    };
82
83    Ok(Child {
84        child,
85        child_fds: output,
86    })
87}
88
89/// The size of each buffered reader's buffer, and the size at which we grow the combined buffer.
90///
91/// This size is not totally arbitrary, but rather the (normal) page size on most systems.
92const CHUNK_SIZE: usize = 4 * 1024;
93
94/// A `BufReader` over an `AsyncRead` that tracks the state of the reader and
95/// whether it is done.
96pub(crate) struct FusedBufReader<R> {
97    reader: BufReader<R>,
98    done: bool,
99}
100
101impl<R: AsyncRead + Unpin> FusedBufReader<R> {
102    pub(crate) fn new(reader: R) -> Self {
103        Self {
104            reader: BufReader::with_capacity(CHUNK_SIZE, reader),
105            done: false,
106        }
107    }
108
109    pub(crate) async fn fill_buf(&mut self, acc: &mut BytesMut) -> Result<(), io::Error> {
110        if self.done {
111            return Ok(());
112        }
113
114        let res = self.reader.fill_buf().await;
115        match res {
116            Ok(buf) => {
117                acc.extend_from_slice(buf);
118                if buf.is_empty() {
119                    self.done = true;
120                }
121                let len = buf.len();
122                self.reader.consume(len);
123                Ok(())
124            }
125            Err(error) => {
126                self.done = true;
127                Err(error)
128            }
129        }
130    }
131
132    pub(crate) fn is_done(&self) -> bool {
133        self.done
134    }
135}
136
137/// A version of [`FusedBufReader::fill_buf`] that works with an `Option<FusedBufReader>`.
138async fn fill_buf_opt<R: AsyncRead + Unpin>(
139    reader: Option<&mut FusedBufReader<R>>,
140    acc: Option<&mut BytesMut>,
141) -> Result<(), io::Error> {
142    if let Some(reader) = reader {
143        let acc = acc.expect("reader and acc must match");
144        reader.fill_buf(acc).await
145    } else {
146        Ok(())
147    }
148}
149
150/// A version of [`FusedBufReader::is_done`] that works with an `Option<FusedBufReader>`.
151fn is_done_opt<R: AsyncRead + Unpin>(reader: &Option<FusedBufReader<R>>) -> bool {
152    reader.as_ref().is_none_or(|r| r.is_done())
153}
154
155/// Output and result accumulator for a child process.
156pub(crate) struct ChildAccumulator {
157    // TODO: it would be nice to also store the tokio::process::Child here, and
158    // for `fill_buf` to select over it.
159    pub(crate) fds: ChildFds,
160    pub(crate) output: ChildOutputMut,
161    pub(crate) errors: Vec<ChildFdError>,
162}
163
164impl ChildAccumulator {
165    pub(crate) fn new(fds: ChildFds) -> Self {
166        let output = fds.make_acc();
167        Self {
168            fds,
169            output,
170            errors: Vec::new(),
171        }
172    }
173
174    pub(crate) async fn fill_buf(&mut self) {
175        let res = self.fds.fill_buf(&mut self.output).await;
176        if let Err(error) = res {
177            self.errors.push(error);
178        }
179    }
180
181    pub(crate) fn snapshot_in_progress(
182        &self,
183        error_description: &'static str,
184    ) -> ChildExecutionOutput {
185        ChildExecutionOutput::Output {
186            result: None,
187            output: self.output.snapshot(),
188            errors: ErrorList::new(error_description, self.errors.clone()),
189        }
190    }
191}
192
193/// File descriptors (or Windows handles) for the child process.
194pub(crate) enum ChildFds {
195    /// Separate stdout and stderr, or they're not captured.
196    Split {
197        stdout: Option<FusedBufReader<ChildStdout>>,
198        stderr: Option<FusedBufReader<ChildStderr>>,
199    },
200
201    /// Combined stdout and stderr.
202    Combined { combined: FusedBufReader<File> },
203}
204
205impl ChildFds {
206    pub(crate) fn new_split(stdout: Option<ChildStdout>, stderr: Option<ChildStderr>) -> Self {
207        Self::Split {
208            stdout: stdout.map(FusedBufReader::new),
209            stderr: stderr.map(FusedBufReader::new),
210        }
211    }
212
213    pub(crate) fn new_combined(rx: File) -> Self {
214        Self::Combined {
215            combined: FusedBufReader::new(rx),
216        }
217    }
218
219    pub(crate) fn is_done(&self) -> bool {
220        match self {
221            Self::Split { stdout, stderr } => is_done_opt(stdout) && is_done_opt(stderr),
222            Self::Combined { combined } => combined.is_done(),
223        }
224    }
225}
226
227impl ChildFds {
228    /// Makes an empty `ChildOutput` with the appropriate buffers for this `ChildFds`.
229    pub(crate) fn make_acc(&self) -> ChildOutputMut {
230        match self {
231            Self::Split { stdout, stderr } => ChildOutputMut::Split {
232                stdout: stdout.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
233                stderr: stderr.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
234            },
235            Self::Combined { .. } => ChildOutputMut::Combined(BytesMut::with_capacity(CHUNK_SIZE)),
236        }
237    }
238
239    /// Fills one of the buffers in `acc` with available data from the child process.
240    ///
241    /// This is a single step in the process of collecting the output of a child process. This
242    /// operation is cancel-safe, since the underlying [`AsyncBufReadExt::fill_buf`] operation is
243    /// cancel-safe.
244    ///
245    /// We follow this "externalized progress" pattern rather than having the collect output futures
246    /// own the data they're collecting, to enable future improvements where we can dump
247    /// currently-captured output to the terminal.
248    pub(crate) async fn fill_buf(&mut self, acc: &mut ChildOutputMut) -> Result<(), ChildFdError> {
249        match self {
250            Self::Split { stdout, stderr } => {
251                let (stdout_acc, stderr_acc) = acc.as_split_mut();
252                // Wait until either of these make progress.
253                tokio::select! {
254                    res = fill_buf_opt(stdout.as_mut(), stdout_acc), if !is_done_opt(stdout) => {
255                        res.map_err(|error| ChildFdError::ReadStdout(Arc::new(error)))
256                    }
257                    res = fill_buf_opt(stderr.as_mut(), stderr_acc), if !is_done_opt(stderr) => {
258                        res.map_err(|error| ChildFdError::ReadStderr(Arc::new(error)))
259                    }
260                    // If both are done, do nothing.
261                    else => {
262                        Ok(())
263                    }
264                }
265            }
266            Self::Combined { combined } => {
267                if !combined.is_done() {
268                    combined
269                        .fill_buf(acc.as_combined_mut())
270                        .await
271                        .map_err(|error| ChildFdError::ReadCombined(Arc::new(error)))
272                } else {
273                    Ok(())
274                }
275            }
276        }
277    }
278}
279
280/// The output of a child process that's currently being collected.
281pub(crate) enum ChildOutputMut {
282    /// Separate stdout and stderr (`None` if not captured).
283    Split {
284        stdout: Option<BytesMut>,
285        stderr: Option<BytesMut>,
286    },
287    /// Combined stdout and stderr.
288    Combined(BytesMut),
289}
290
291impl ChildOutputMut {
292    fn as_split_mut(&mut self) -> (Option<&mut BytesMut>, Option<&mut BytesMut>) {
293        match self {
294            Self::Split { stdout, stderr } => (stdout.as_mut(), stderr.as_mut()),
295            _ => panic!("ChildOutput is not split"),
296        }
297    }
298
299    fn as_combined_mut(&mut self) -> &mut BytesMut {
300        match self {
301            Self::Combined(combined) => combined,
302            _ => panic!("ChildOutput is not combined"),
303        }
304    }
305
306    /// Makes a snapshot of the current output, returning a [`TestOutput`].
307    ///
308    /// This requires cloning the output so it's more expensive than [`Self::freeze`].
309    pub(crate) fn snapshot(&self) -> ChildOutput {
310        match self {
311            Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
312                stdout: stdout.as_ref().map(|x| x.clone().freeze().into()),
313                stderr: stderr.as_ref().map(|x| x.clone().freeze().into()),
314            }),
315            Self::Combined(combined) => ChildOutput::Combined {
316                output: combined.clone().freeze().into(),
317            },
318        }
319    }
320
321    /// Marks the collection as done, returning a `TestOutput`.
322    pub(crate) fn freeze(self) -> ChildOutput {
323        match self {
324            Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
325                stdout: stdout.map(|x| x.freeze().into()),
326                stderr: stderr.map(|x| x.freeze().into()),
327            }),
328            Self::Combined(combined) => ChildOutput::Combined {
329                output: combined.freeze().into(),
330            },
331        }
332    }
333}