nextest_runner/test_command/
imp.rs1use crate::{
5 errors::{ChildFdError, ErrorList},
6 test_output::{CaptureStrategy, ChildExecutionOutput, ChildOutput, ChildSplitOutput},
7};
8use bytes::BytesMut;
9use std::{
10 io::{self, PipeReader},
11 process::Stdio,
12 sync::Arc,
13};
14use tokio::{
15 fs::File,
16 io::{AsyncBufReadExt, AsyncRead, BufReader},
17 process::{Child as TokioChild, ChildStderr, ChildStdout},
18};
19
20cfg_if::cfg_if! {
21 if #[cfg(unix)] {
22 #[path = "unix.rs"]
23 mod unix;
24 use unix as os;
25 } else if #[cfg(windows)] {
26 #[path = "windows.rs"]
27 mod windows;
28 use windows as os;
29 } else {
30 compile_error!("unsupported target platform");
31 }
32}
33
34pub(crate) struct Child {
36 pub child: TokioChild,
37 pub child_fds: ChildFds,
38}
39
40pub(super) fn spawn(
41 mut cmd: std::process::Command,
42 strategy: CaptureStrategy,
43 stdin_passthrough: bool,
44) -> std::io::Result<Child> {
45 if stdin_passthrough {
46 cmd.stdin(Stdio::inherit());
47 } else {
48 cmd.stdin(Stdio::null());
49 }
50
51 let combined_rx: Option<PipeReader> = match strategy {
52 CaptureStrategy::None => None,
53 CaptureStrategy::Split => {
54 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
55 None
56 }
57 CaptureStrategy::Combined => {
58 let (rx, tx) = std::io::pipe()?;
67 cmd.stdout(tx.try_clone()?).stderr(tx);
68 Some(rx)
69 }
70 };
71
72 let mut cmd: tokio::process::Command = cmd.into();
73 let mut child = cmd.spawn()?;
74
75 let output = match strategy {
76 CaptureStrategy::None => ChildFds::new_split(None, None),
77 CaptureStrategy::Split => {
78 let stdout = child.stdout.take().expect("stdout was set");
79 let stderr = child.stderr.take().expect("stderr was set");
80
81 ChildFds::new_split(Some(stdout), Some(stderr))
82 }
83 CaptureStrategy::Combined => ChildFds::new_combined(
84 os::pipe_reader_to_file(combined_rx.expect("combined_fx was set")).into(),
85 ),
86 };
87
88 Ok(Child {
89 child,
90 child_fds: output,
91 })
92}
93
94const CHUNK_SIZE: usize = 4 * 1024;
98
99pub(crate) struct FusedBufReader<R> {
102 reader: BufReader<R>,
103 done: bool,
104}
105
106impl<R: AsyncRead + Unpin> FusedBufReader<R> {
107 pub(crate) fn new(reader: R) -> Self {
108 Self {
109 reader: BufReader::with_capacity(CHUNK_SIZE, reader),
110 done: false,
111 }
112 }
113
114 pub(crate) async fn fill_buf(&mut self, acc: &mut BytesMut) -> Result<(), io::Error> {
115 if self.done {
116 return Ok(());
117 }
118
119 let res = self.reader.fill_buf().await;
120 match res {
121 Ok(buf) => {
122 acc.extend_from_slice(buf);
123 if buf.is_empty() {
124 self.done = true;
125 }
126 let len = buf.len();
127 self.reader.consume(len);
128 Ok(())
129 }
130 Err(error) => {
131 self.done = true;
132 Err(error)
133 }
134 }
135 }
136
137 pub(crate) fn is_done(&self) -> bool {
138 self.done
139 }
140}
141
142async fn fill_buf_opt<R: AsyncRead + Unpin>(
144 reader: Option<&mut FusedBufReader<R>>,
145 acc: Option<&mut BytesMut>,
146) -> Result<(), io::Error> {
147 if let Some(reader) = reader {
148 let acc = acc.expect("reader and acc must match");
149 reader.fill_buf(acc).await
150 } else {
151 Ok(())
152 }
153}
154
155fn is_done_opt<R: AsyncRead + Unpin>(reader: &Option<FusedBufReader<R>>) -> bool {
157 reader.as_ref().is_none_or(|r| r.is_done())
158}
159
160pub(crate) struct ChildAccumulator {
162 pub(crate) fds: ChildFds,
165 pub(crate) output: ChildOutputMut,
166 pub(crate) errors: Vec<ChildFdError>,
167}
168
169impl ChildAccumulator {
170 pub(crate) fn new(fds: ChildFds) -> Self {
171 let output = fds.make_acc();
172 Self {
173 fds,
174 output,
175 errors: Vec::new(),
176 }
177 }
178
179 pub(crate) async fn fill_buf(&mut self) {
180 let res = self.fds.fill_buf(&mut self.output).await;
181 if let Err(error) = res {
182 self.errors.push(error);
183 }
184 }
185
186 pub(crate) fn snapshot_in_progress(
187 &self,
188 error_description: &'static str,
189 ) -> ChildExecutionOutput {
190 ChildExecutionOutput::Output {
191 result: None,
192 output: self.output.snapshot(),
193 errors: ErrorList::new(error_description, self.errors.clone()),
194 }
195 }
196}
197
198pub(crate) enum ChildFds {
200 Split {
202 stdout: Option<FusedBufReader<ChildStdout>>,
203 stderr: Option<FusedBufReader<ChildStderr>>,
204 },
205
206 Combined { combined: FusedBufReader<File> },
208}
209
210impl ChildFds {
211 pub(crate) fn new_split(stdout: Option<ChildStdout>, stderr: Option<ChildStderr>) -> Self {
212 Self::Split {
213 stdout: stdout.map(FusedBufReader::new),
214 stderr: stderr.map(FusedBufReader::new),
215 }
216 }
217
218 pub(crate) fn new_combined(rx: File) -> Self {
219 Self::Combined {
220 combined: FusedBufReader::new(rx),
221 }
222 }
223
224 pub(crate) fn is_done(&self) -> bool {
225 match self {
226 Self::Split { stdout, stderr } => is_done_opt(stdout) && is_done_opt(stderr),
227 Self::Combined { combined } => combined.is_done(),
228 }
229 }
230}
231
232impl ChildFds {
233 pub(crate) fn make_acc(&self) -> ChildOutputMut {
235 match self {
236 Self::Split { stdout, stderr } => ChildOutputMut::Split {
237 stdout: stdout.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
238 stderr: stderr.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
239 },
240 Self::Combined { .. } => ChildOutputMut::Combined(BytesMut::with_capacity(CHUNK_SIZE)),
241 }
242 }
243
244 pub(crate) async fn fill_buf(&mut self, acc: &mut ChildOutputMut) -> Result<(), ChildFdError> {
254 match self {
255 Self::Split { stdout, stderr } => {
256 let (stdout_acc, stderr_acc) = acc.as_split_mut();
257 tokio::select! {
259 res = fill_buf_opt(stdout.as_mut(), stdout_acc), if !is_done_opt(stdout) => {
260 res.map_err(|error| ChildFdError::ReadStdout(Arc::new(error)))
261 }
262 res = fill_buf_opt(stderr.as_mut(), stderr_acc), if !is_done_opt(stderr) => {
263 res.map_err(|error| ChildFdError::ReadStderr(Arc::new(error)))
264 }
265 else => {
267 Ok(())
268 }
269 }
270 }
271 Self::Combined { combined } => {
272 if !combined.is_done() {
273 combined
274 .fill_buf(acc.as_combined_mut())
275 .await
276 .map_err(|error| ChildFdError::ReadCombined(Arc::new(error)))
277 } else {
278 Ok(())
279 }
280 }
281 }
282 }
283}
284
285pub(crate) enum ChildOutputMut {
287 Split {
289 stdout: Option<BytesMut>,
290 stderr: Option<BytesMut>,
291 },
292 Combined(BytesMut),
294}
295
296impl ChildOutputMut {
297 fn as_split_mut(&mut self) -> (Option<&mut BytesMut>, Option<&mut BytesMut>) {
298 match self {
299 Self::Split { stdout, stderr } => (stdout.as_mut(), stderr.as_mut()),
300 _ => panic!("ChildOutput is not split"),
301 }
302 }
303
304 fn as_combined_mut(&mut self) -> &mut BytesMut {
305 match self {
306 Self::Combined(combined) => combined,
307 _ => panic!("ChildOutput is not combined"),
308 }
309 }
310
311 pub(crate) fn snapshot(&self) -> ChildOutput {
315 match self {
316 Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
317 stdout: stdout.as_ref().map(|x| x.clone().freeze().into()),
318 stderr: stderr.as_ref().map(|x| x.clone().freeze().into()),
319 }),
320 Self::Combined(combined) => ChildOutput::Combined {
321 output: combined.clone().freeze().into(),
322 },
323 }
324 }
325
326 pub(crate) fn freeze(self) -> ChildOutput {
328 match self {
329 Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
330 stdout: stdout.map(|x| x.freeze().into()),
331 stderr: stderr.map(|x| x.freeze().into()),
332 }),
333 Self::Combined(combined) => ChildOutput::Combined {
334 output: combined.freeze().into(),
335 },
336 }
337 }
338
339 pub(crate) fn stdout_stderr_len(&self) -> (Option<u64>, Option<u64>) {
343 match self {
344 Self::Split { stdout, stderr } => (
345 stdout.as_ref().map(|b| b.len() as u64),
346 stderr.as_ref().map(|b| b.len() as u64),
347 ),
348 Self::Combined(combined) => (Some(combined.len() as u64), None),
349 }
350 }
351}