nextest_runner/test_command/
imp.rs1use crate::{
5 errors::{ChildFdError, ErrorList},
6 test_output::{CaptureStrategy, ChildExecutionOutput, ChildOutput, ChildSplitOutput},
7};
8use bytes::BytesMut;
9use std::{
10 io::{self, PipeReader},
11 process::Stdio,
12 sync::Arc,
13};
14use tokio::{
15 fs::File,
16 io::{AsyncBufReadExt, AsyncRead, BufReader},
17 process::{Child as TokioChild, ChildStderr, ChildStdout},
18};
19
20cfg_if::cfg_if! {
21 if #[cfg(unix)] {
22 #[path = "unix.rs"]
23 mod unix;
24 use unix as os;
25 } else if #[cfg(windows)] {
26 #[path = "windows.rs"]
27 mod windows;
28 use windows as os;
29 } else {
30 compile_error!("unsupported target platform");
31 }
32}
33
34pub(crate) struct Child {
36 pub child: TokioChild,
37 pub child_fds: ChildFds,
38}
39
40pub(super) fn spawn(
41 mut cmd: std::process::Command,
42 strategy: CaptureStrategy,
43) -> std::io::Result<Child> {
44 cmd.stdin(Stdio::null());
45
46 let combined_rx: Option<PipeReader> = match strategy {
47 CaptureStrategy::None => None,
48 CaptureStrategy::Split => {
49 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
50 None
51 }
52 CaptureStrategy::Combined => {
53 let (rx, tx) = std::io::pipe()?;
62 cmd.stdout(tx.try_clone()?).stderr(tx);
63 Some(rx)
64 }
65 };
66
67 let mut cmd: tokio::process::Command = cmd.into();
68 let mut child = cmd.spawn()?;
69
70 let output = match strategy {
71 CaptureStrategy::None => ChildFds::new_split(None, None),
72 CaptureStrategy::Split => {
73 let stdout = child.stdout.take().expect("stdout was set");
74 let stderr = child.stderr.take().expect("stderr was set");
75
76 ChildFds::new_split(Some(stdout), Some(stderr))
77 }
78 CaptureStrategy::Combined => ChildFds::new_combined(
79 os::pipe_reader_to_file(combined_rx.expect("combined_fx was set")).into(),
80 ),
81 };
82
83 Ok(Child {
84 child,
85 child_fds: output,
86 })
87}
88
89const CHUNK_SIZE: usize = 4 * 1024;
93
94pub(crate) struct FusedBufReader<R> {
97 reader: BufReader<R>,
98 done: bool,
99}
100
101impl<R: AsyncRead + Unpin> FusedBufReader<R> {
102 pub(crate) fn new(reader: R) -> Self {
103 Self {
104 reader: BufReader::with_capacity(CHUNK_SIZE, reader),
105 done: false,
106 }
107 }
108
109 pub(crate) async fn fill_buf(&mut self, acc: &mut BytesMut) -> Result<(), io::Error> {
110 if self.done {
111 return Ok(());
112 }
113
114 let res = self.reader.fill_buf().await;
115 match res {
116 Ok(buf) => {
117 acc.extend_from_slice(buf);
118 if buf.is_empty() {
119 self.done = true;
120 }
121 let len = buf.len();
122 self.reader.consume(len);
123 Ok(())
124 }
125 Err(error) => {
126 self.done = true;
127 Err(error)
128 }
129 }
130 }
131
132 pub(crate) fn is_done(&self) -> bool {
133 self.done
134 }
135}
136
137async fn fill_buf_opt<R: AsyncRead + Unpin>(
139 reader: Option<&mut FusedBufReader<R>>,
140 acc: Option<&mut BytesMut>,
141) -> Result<(), io::Error> {
142 if let Some(reader) = reader {
143 let acc = acc.expect("reader and acc must match");
144 reader.fill_buf(acc).await
145 } else {
146 Ok(())
147 }
148}
149
150fn is_done_opt<R: AsyncRead + Unpin>(reader: &Option<FusedBufReader<R>>) -> bool {
152 reader.as_ref().is_none_or(|r| r.is_done())
153}
154
155pub(crate) struct ChildAccumulator {
157 pub(crate) fds: ChildFds,
160 pub(crate) output: ChildOutputMut,
161 pub(crate) errors: Vec<ChildFdError>,
162}
163
164impl ChildAccumulator {
165 pub(crate) fn new(fds: ChildFds) -> Self {
166 let output = fds.make_acc();
167 Self {
168 fds,
169 output,
170 errors: Vec::new(),
171 }
172 }
173
174 pub(crate) async fn fill_buf(&mut self) {
175 let res = self.fds.fill_buf(&mut self.output).await;
176 if let Err(error) = res {
177 self.errors.push(error);
178 }
179 }
180
181 pub(crate) fn snapshot_in_progress(
182 &self,
183 error_description: &'static str,
184 ) -> ChildExecutionOutput {
185 ChildExecutionOutput::Output {
186 result: None,
187 output: self.output.snapshot(),
188 errors: ErrorList::new(error_description, self.errors.clone()),
189 }
190 }
191}
192
193pub(crate) enum ChildFds {
195 Split {
197 stdout: Option<FusedBufReader<ChildStdout>>,
198 stderr: Option<FusedBufReader<ChildStderr>>,
199 },
200
201 Combined { combined: FusedBufReader<File> },
203}
204
205impl ChildFds {
206 pub(crate) fn new_split(stdout: Option<ChildStdout>, stderr: Option<ChildStderr>) -> Self {
207 Self::Split {
208 stdout: stdout.map(FusedBufReader::new),
209 stderr: stderr.map(FusedBufReader::new),
210 }
211 }
212
213 pub(crate) fn new_combined(rx: File) -> Self {
214 Self::Combined {
215 combined: FusedBufReader::new(rx),
216 }
217 }
218
219 pub(crate) fn is_done(&self) -> bool {
220 match self {
221 Self::Split { stdout, stderr } => is_done_opt(stdout) && is_done_opt(stderr),
222 Self::Combined { combined } => combined.is_done(),
223 }
224 }
225}
226
227impl ChildFds {
228 pub(crate) fn make_acc(&self) -> ChildOutputMut {
230 match self {
231 Self::Split { stdout, stderr } => ChildOutputMut::Split {
232 stdout: stdout.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
233 stderr: stderr.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
234 },
235 Self::Combined { .. } => ChildOutputMut::Combined(BytesMut::with_capacity(CHUNK_SIZE)),
236 }
237 }
238
239 pub(crate) async fn fill_buf(&mut self, acc: &mut ChildOutputMut) -> Result<(), ChildFdError> {
249 match self {
250 Self::Split { stdout, stderr } => {
251 let (stdout_acc, stderr_acc) = acc.as_split_mut();
252 tokio::select! {
254 res = fill_buf_opt(stdout.as_mut(), stdout_acc), if !is_done_opt(stdout) => {
255 res.map_err(|error| ChildFdError::ReadStdout(Arc::new(error)))
256 }
257 res = fill_buf_opt(stderr.as_mut(), stderr_acc), if !is_done_opt(stderr) => {
258 res.map_err(|error| ChildFdError::ReadStderr(Arc::new(error)))
259 }
260 else => {
262 Ok(())
263 }
264 }
265 }
266 Self::Combined { combined } => {
267 if !combined.is_done() {
268 combined
269 .fill_buf(acc.as_combined_mut())
270 .await
271 .map_err(|error| ChildFdError::ReadCombined(Arc::new(error)))
272 } else {
273 Ok(())
274 }
275 }
276 }
277 }
278}
279
280pub(crate) enum ChildOutputMut {
282 Split {
284 stdout: Option<BytesMut>,
285 stderr: Option<BytesMut>,
286 },
287 Combined(BytesMut),
289}
290
291impl ChildOutputMut {
292 fn as_split_mut(&mut self) -> (Option<&mut BytesMut>, Option<&mut BytesMut>) {
293 match self {
294 Self::Split { stdout, stderr } => (stdout.as_mut(), stderr.as_mut()),
295 _ => panic!("ChildOutput is not split"),
296 }
297 }
298
299 fn as_combined_mut(&mut self) -> &mut BytesMut {
300 match self {
301 Self::Combined(combined) => combined,
302 _ => panic!("ChildOutput is not combined"),
303 }
304 }
305
306 pub(crate) fn snapshot(&self) -> ChildOutput {
310 match self {
311 Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
312 stdout: stdout.as_ref().map(|x| x.clone().freeze().into()),
313 stderr: stderr.as_ref().map(|x| x.clone().freeze().into()),
314 }),
315 Self::Combined(combined) => ChildOutput::Combined {
316 output: combined.clone().freeze().into(),
317 },
318 }
319 }
320
321 pub(crate) fn freeze(self) -> ChildOutput {
323 match self {
324 Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
325 stdout: stdout.map(|x| x.freeze().into()),
326 stderr: stderr.map(|x| x.freeze().into()),
327 }),
328 Self::Combined(combined) => ChildOutput::Combined {
329 output: combined.freeze().into(),
330 },
331 }
332 }
333}