nextest_runner/test_command/
imp.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::{
5    errors::{ChildFdError, ErrorList},
6    test_output::{CaptureStrategy, ChildExecutionOutput, ChildOutput, ChildSplitOutput},
7};
8use bytes::BytesMut;
9use std::{
10    io::{self, PipeReader},
11    process::Stdio,
12    sync::Arc,
13};
14use tokio::{
15    fs::File,
16    io::{AsyncBufReadExt, AsyncRead, BufReader},
17    process::{Child as TokioChild, ChildStderr, ChildStdout},
18};
19
20cfg_if::cfg_if! {
21    if #[cfg(unix)] {
22        #[path = "unix.rs"]
23        mod unix;
24        use unix as os;
25    } else if #[cfg(windows)] {
26        #[path = "windows.rs"]
27        mod windows;
28        use windows as os;
29    } else {
30        compile_error!("unsupported target platform");
31    }
32}
33
34/// A spawned child process along with its file descriptors.
35pub(crate) struct Child {
36    pub child: TokioChild,
37    pub child_fds: ChildFds,
38}
39
40pub(super) fn spawn(
41    mut cmd: std::process::Command,
42    strategy: CaptureStrategy,
43    stdin_passthrough: bool,
44) -> std::io::Result<Child> {
45    if stdin_passthrough {
46        cmd.stdin(Stdio::inherit());
47    } else {
48        cmd.stdin(Stdio::null());
49    }
50
51    let combined_rx: Option<PipeReader> = match strategy {
52        CaptureStrategy::None => None,
53        CaptureStrategy::Split => {
54            cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
55            None
56        }
57        CaptureStrategy::Combined => {
58            // We use std::io::pipe() here rather than tokio::net::unix::pipe()
59            // for a couple of reasons:
60            //
61            // * std::io::pipe has the most up-to-date information about things
62            //   like atomic O_CLOEXEC. In particular, mio-pipe 0.1.1 doesn't do
63            //   O_CLOEXEC on platforms like illumos.
64            // * There's no analog to Tokio's anonymous pipes on Windows, while
65            //   std::io::pipe works on all platforms.
66            let (rx, tx) = std::io::pipe()?;
67            cmd.stdout(tx.try_clone()?).stderr(tx);
68            Some(rx)
69        }
70    };
71
72    let mut cmd: tokio::process::Command = cmd.into();
73    let mut child = cmd.spawn()?;
74
75    let output = match strategy {
76        CaptureStrategy::None => ChildFds::new_split(None, None),
77        CaptureStrategy::Split => {
78            let stdout = child.stdout.take().expect("stdout was set");
79            let stderr = child.stderr.take().expect("stderr was set");
80
81            ChildFds::new_split(Some(stdout), Some(stderr))
82        }
83        CaptureStrategy::Combined => ChildFds::new_combined(
84            os::pipe_reader_to_file(combined_rx.expect("combined_fx was set")).into(),
85        ),
86    };
87
88    Ok(Child {
89        child,
90        child_fds: output,
91    })
92}
93
94/// The size of each buffered reader's buffer, and the size at which we grow the combined buffer.
95///
96/// This size is not totally arbitrary, but rather the (normal) page size on most systems.
97const CHUNK_SIZE: usize = 4 * 1024;
98
99/// A `BufReader` over an `AsyncRead` that tracks the state of the reader and
100/// whether it is done.
101pub(crate) struct FusedBufReader<R> {
102    reader: BufReader<R>,
103    done: bool,
104}
105
106impl<R: AsyncRead + Unpin> FusedBufReader<R> {
107    pub(crate) fn new(reader: R) -> Self {
108        Self {
109            reader: BufReader::with_capacity(CHUNK_SIZE, reader),
110            done: false,
111        }
112    }
113
114    pub(crate) async fn fill_buf(&mut self, acc: &mut BytesMut) -> Result<(), io::Error> {
115        if self.done {
116            return Ok(());
117        }
118
119        let res = self.reader.fill_buf().await;
120        match res {
121            Ok(buf) => {
122                acc.extend_from_slice(buf);
123                if buf.is_empty() {
124                    self.done = true;
125                }
126                let len = buf.len();
127                self.reader.consume(len);
128                Ok(())
129            }
130            Err(error) => {
131                self.done = true;
132                Err(error)
133            }
134        }
135    }
136
137    pub(crate) fn is_done(&self) -> bool {
138        self.done
139    }
140}
141
142/// A version of [`FusedBufReader::fill_buf`] that works with an `Option<FusedBufReader>`.
143async fn fill_buf_opt<R: AsyncRead + Unpin>(
144    reader: Option<&mut FusedBufReader<R>>,
145    acc: Option<&mut BytesMut>,
146) -> Result<(), io::Error> {
147    if let Some(reader) = reader {
148        let acc = acc.expect("reader and acc must match");
149        reader.fill_buf(acc).await
150    } else {
151        Ok(())
152    }
153}
154
155/// A version of [`FusedBufReader::is_done`] that works with an `Option<FusedBufReader>`.
156fn is_done_opt<R: AsyncRead + Unpin>(reader: &Option<FusedBufReader<R>>) -> bool {
157    reader.as_ref().is_none_or(|r| r.is_done())
158}
159
160/// Output and result accumulator for a child process.
161pub(crate) struct ChildAccumulator {
162    // TODO: it would be nice to also store the tokio::process::Child here, and
163    // for `fill_buf` to select over it.
164    pub(crate) fds: ChildFds,
165    pub(crate) output: ChildOutputMut,
166    pub(crate) errors: Vec<ChildFdError>,
167}
168
169impl ChildAccumulator {
170    pub(crate) fn new(fds: ChildFds) -> Self {
171        let output = fds.make_acc();
172        Self {
173            fds,
174            output,
175            errors: Vec::new(),
176        }
177    }
178
179    pub(crate) async fn fill_buf(&mut self) {
180        let res = self.fds.fill_buf(&mut self.output).await;
181        if let Err(error) = res {
182            self.errors.push(error);
183        }
184    }
185
186    pub(crate) fn snapshot_in_progress(
187        &self,
188        error_description: &'static str,
189    ) -> ChildExecutionOutput {
190        ChildExecutionOutput::Output {
191            result: None,
192            output: self.output.snapshot(),
193            errors: ErrorList::new(error_description, self.errors.clone()),
194        }
195    }
196}
197
198/// File descriptors (or Windows handles) for the child process.
199pub(crate) enum ChildFds {
200    /// Separate stdout and stderr, or they're not captured.
201    Split {
202        stdout: Option<FusedBufReader<ChildStdout>>,
203        stderr: Option<FusedBufReader<ChildStderr>>,
204    },
205
206    /// Combined stdout and stderr.
207    Combined { combined: FusedBufReader<File> },
208}
209
210impl ChildFds {
211    pub(crate) fn new_split(stdout: Option<ChildStdout>, stderr: Option<ChildStderr>) -> Self {
212        Self::Split {
213            stdout: stdout.map(FusedBufReader::new),
214            stderr: stderr.map(FusedBufReader::new),
215        }
216    }
217
218    pub(crate) fn new_combined(rx: File) -> Self {
219        Self::Combined {
220            combined: FusedBufReader::new(rx),
221        }
222    }
223
224    pub(crate) fn is_done(&self) -> bool {
225        match self {
226            Self::Split { stdout, stderr } => is_done_opt(stdout) && is_done_opt(stderr),
227            Self::Combined { combined } => combined.is_done(),
228        }
229    }
230}
231
232impl ChildFds {
233    /// Makes an empty `ChildOutput` with the appropriate buffers for this `ChildFds`.
234    pub(crate) fn make_acc(&self) -> ChildOutputMut {
235        match self {
236            Self::Split { stdout, stderr } => ChildOutputMut::Split {
237                stdout: stdout.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
238                stderr: stderr.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
239            },
240            Self::Combined { .. } => ChildOutputMut::Combined(BytesMut::with_capacity(CHUNK_SIZE)),
241        }
242    }
243
244    /// Fills one of the buffers in `acc` with available data from the child process.
245    ///
246    /// This is a single step in the process of collecting the output of a child process. This
247    /// operation is cancel-safe, since the underlying [`AsyncBufReadExt::fill_buf`] operation is
248    /// cancel-safe.
249    ///
250    /// We follow this "externalized progress" pattern rather than having the collect output futures
251    /// own the data they're collecting, to enable future improvements where we can dump
252    /// currently-captured output to the terminal.
253    pub(crate) async fn fill_buf(&mut self, acc: &mut ChildOutputMut) -> Result<(), ChildFdError> {
254        match self {
255            Self::Split { stdout, stderr } => {
256                let (stdout_acc, stderr_acc) = acc.as_split_mut();
257                // Wait until either of these make progress.
258                tokio::select! {
259                    res = fill_buf_opt(stdout.as_mut(), stdout_acc), if !is_done_opt(stdout) => {
260                        res.map_err(|error| ChildFdError::ReadStdout(Arc::new(error)))
261                    }
262                    res = fill_buf_opt(stderr.as_mut(), stderr_acc), if !is_done_opt(stderr) => {
263                        res.map_err(|error| ChildFdError::ReadStderr(Arc::new(error)))
264                    }
265                    // If both are done, do nothing.
266                    else => {
267                        Ok(())
268                    }
269                }
270            }
271            Self::Combined { combined } => {
272                if !combined.is_done() {
273                    combined
274                        .fill_buf(acc.as_combined_mut())
275                        .await
276                        .map_err(|error| ChildFdError::ReadCombined(Arc::new(error)))
277                } else {
278                    Ok(())
279                }
280            }
281        }
282    }
283}
284
285/// The output of a child process that's currently being collected.
286pub(crate) enum ChildOutputMut {
287    /// Separate stdout and stderr (`None` if not captured).
288    Split {
289        stdout: Option<BytesMut>,
290        stderr: Option<BytesMut>,
291    },
292    /// Combined stdout and stderr.
293    Combined(BytesMut),
294}
295
296impl ChildOutputMut {
297    fn as_split_mut(&mut self) -> (Option<&mut BytesMut>, Option<&mut BytesMut>) {
298        match self {
299            Self::Split { stdout, stderr } => (stdout.as_mut(), stderr.as_mut()),
300            _ => panic!("ChildOutput is not split"),
301        }
302    }
303
304    fn as_combined_mut(&mut self) -> &mut BytesMut {
305        match self {
306            Self::Combined(combined) => combined,
307            _ => panic!("ChildOutput is not combined"),
308        }
309    }
310
311    /// Makes a snapshot of the current output, returning a [`TestOutput`].
312    ///
313    /// This requires cloning the output so it's more expensive than [`Self::freeze`].
314    pub(crate) fn snapshot(&self) -> ChildOutput {
315        match self {
316            Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
317                stdout: stdout.as_ref().map(|x| x.clone().freeze().into()),
318                stderr: stderr.as_ref().map(|x| x.clone().freeze().into()),
319            }),
320            Self::Combined(combined) => ChildOutput::Combined {
321                output: combined.clone().freeze().into(),
322            },
323        }
324    }
325
326    /// Marks the collection as done, returning a `TestOutput`.
327    pub(crate) fn freeze(self) -> ChildOutput {
328        match self {
329            Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
330                stdout: stdout.map(|x| x.freeze().into()),
331                stderr: stderr.map(|x| x.freeze().into()),
332            }),
333            Self::Combined(combined) => ChildOutput::Combined {
334                output: combined.freeze().into(),
335            },
336        }
337    }
338
339    /// Returns the lengths of stdout and stderr in bytes.
340    ///
341    /// Returns `None` for each stream that wasn't captured.
342    pub(crate) fn stdout_stderr_len(&self) -> (Option<u64>, Option<u64>) {
343        match self {
344            Self::Split { stdout, stderr } => (
345                stdout.as_ref().map(|b| b.len() as u64),
346                stderr.as_ref().map(|b| b.len() as u64),
347            ),
348            Self::Combined(combined) => (Some(combined.len() as u64), None),
349        }
350    }
351}