nextest_runner/test_command/
imp.rs1use crate::{
5 errors::{ChildFdError, ErrorList},
6 test_output::{CaptureStrategy, ChildExecutionOutput, ChildOutput, ChildSplitOutput},
7};
8use bytes::BytesMut;
9use std::{io, process::Stdio, sync::Arc};
10use tokio::{
11 fs::File,
12 io::{AsyncBufReadExt, AsyncRead, BufReader},
13 process::{Child as TokioChild, ChildStderr, ChildStdout},
14};
15
16cfg_if::cfg_if! {
17 if #[cfg(unix)] {
18 #[path = "unix.rs"]
19 mod unix;
20 use unix as os;
21 } else if #[cfg(windows)] {
22 #[path = "windows.rs"]
23 mod windows;
24 use windows as os;
25 } else {
26 compile_error!("unsupported target platform");
27 }
28}
29
30pub(crate) struct Child {
32 pub child: TokioChild,
33 pub child_fds: ChildFds,
34}
35
36pub(super) fn spawn(
37 mut cmd: std::process::Command,
38 strategy: CaptureStrategy,
39) -> std::io::Result<Child> {
40 cmd.stdin(Stdio::null());
41
42 let state: Option<os::State> = match strategy {
43 CaptureStrategy::None => None,
44 CaptureStrategy::Split => {
45 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
46 None
47 }
48 CaptureStrategy::Combined => Some(os::setup_io(&mut cmd)?),
49 };
50
51 let mut cmd: tokio::process::Command = cmd.into();
52 let mut child = cmd.spawn()?;
53
54 let output = match strategy {
55 CaptureStrategy::None => ChildFds::new_split(None, None),
56 CaptureStrategy::Split => {
57 let stdout = child.stdout.take().expect("stdout was set");
58 let stderr = child.stderr.take().expect("stderr was set");
59
60 ChildFds::new_split(Some(stdout), Some(stderr))
61 }
62 CaptureStrategy::Combined => {
63 ChildFds::new_combined(std::fs::File::from(state.expect("state was set").ours).into())
64 }
65 };
66
67 Ok(Child {
68 child,
69 child_fds: output,
70 })
71}
72
73const CHUNK_SIZE: usize = 4 * 1024;
77
78pub(crate) struct FusedBufReader<R> {
81 reader: BufReader<R>,
82 done: bool,
83}
84
85impl<R: AsyncRead + Unpin> FusedBufReader<R> {
86 pub(crate) fn new(reader: R) -> Self {
87 Self {
88 reader: BufReader::with_capacity(CHUNK_SIZE, reader),
89 done: false,
90 }
91 }
92
93 pub(crate) async fn fill_buf(&mut self, acc: &mut BytesMut) -> Result<(), io::Error> {
94 if self.done {
95 return Ok(());
96 }
97
98 let res = self.reader.fill_buf().await;
99 match res {
100 Ok(buf) => {
101 acc.extend_from_slice(buf);
102 if buf.is_empty() {
103 self.done = true;
104 }
105 let len = buf.len();
106 self.reader.consume(len);
107 Ok(())
108 }
109 Err(error) => {
110 self.done = true;
111 Err(error)
112 }
113 }
114 }
115
116 pub(crate) fn is_done(&self) -> bool {
117 self.done
118 }
119}
120
121async fn fill_buf_opt<R: AsyncRead + Unpin>(
123 reader: Option<&mut FusedBufReader<R>>,
124 acc: Option<&mut BytesMut>,
125) -> Result<(), io::Error> {
126 if let Some(reader) = reader {
127 let acc = acc.expect("reader and acc must match");
128 reader.fill_buf(acc).await
129 } else {
130 Ok(())
131 }
132}
133
134fn is_done_opt<R: AsyncRead + Unpin>(reader: &Option<FusedBufReader<R>>) -> bool {
136 reader.as_ref().is_none_or(|r| r.is_done())
137}
138
139pub(crate) struct ChildAccumulator {
141 pub(crate) fds: ChildFds,
144 pub(crate) output: ChildOutputMut,
145 pub(crate) errors: Vec<ChildFdError>,
146}
147
148impl ChildAccumulator {
149 pub(crate) fn new(fds: ChildFds) -> Self {
150 let output = fds.make_acc();
151 Self {
152 fds,
153 output,
154 errors: Vec::new(),
155 }
156 }
157
158 pub(crate) async fn fill_buf(&mut self) {
159 let res = self.fds.fill_buf(&mut self.output).await;
160 if let Err(error) = res {
161 self.errors.push(error);
162 }
163 }
164
165 pub(crate) fn snapshot_in_progress(
166 &self,
167 error_description: &'static str,
168 ) -> ChildExecutionOutput {
169 ChildExecutionOutput::Output {
170 result: None,
171 output: self.output.snapshot(),
172 errors: ErrorList::new(error_description, self.errors.clone()),
173 }
174 }
175}
176
177pub(crate) enum ChildFds {
179 Split {
181 stdout: Option<FusedBufReader<ChildStdout>>,
182 stderr: Option<FusedBufReader<ChildStderr>>,
183 },
184
185 Combined { combined: FusedBufReader<File> },
187}
188
189impl ChildFds {
190 pub(crate) fn new_split(stdout: Option<ChildStdout>, stderr: Option<ChildStderr>) -> Self {
191 Self::Split {
192 stdout: stdout.map(FusedBufReader::new),
193 stderr: stderr.map(FusedBufReader::new),
194 }
195 }
196
197 pub(crate) fn new_combined(file: File) -> Self {
198 Self::Combined {
199 combined: FusedBufReader::new(file),
200 }
201 }
202
203 pub(crate) fn is_done(&self) -> bool {
204 match self {
205 Self::Split { stdout, stderr } => is_done_opt(stdout) && is_done_opt(stderr),
206 Self::Combined { combined } => combined.is_done(),
207 }
208 }
209}
210
211impl ChildFds {
212 pub(crate) fn make_acc(&self) -> ChildOutputMut {
214 match self {
215 Self::Split { stdout, stderr } => ChildOutputMut::Split {
216 stdout: stdout.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
217 stderr: stderr.as_ref().map(|_| BytesMut::with_capacity(CHUNK_SIZE)),
218 },
219 Self::Combined { .. } => ChildOutputMut::Combined(BytesMut::with_capacity(CHUNK_SIZE)),
220 }
221 }
222
223 pub(crate) async fn fill_buf(&mut self, acc: &mut ChildOutputMut) -> Result<(), ChildFdError> {
233 match self {
234 Self::Split { stdout, stderr } => {
235 let (stdout_acc, stderr_acc) = acc.as_split_mut();
236 tokio::select! {
238 res = fill_buf_opt(stdout.as_mut(), stdout_acc), if !is_done_opt(stdout) => {
239 res.map_err(|error| ChildFdError::ReadStdout(Arc::new(error)))
240 }
241 res = fill_buf_opt(stderr.as_mut(), stderr_acc), if !is_done_opt(stderr) => {
242 res.map_err(|error| ChildFdError::ReadStderr(Arc::new(error)))
243 }
244 else => {
246 Ok(())
247 }
248 }
249 }
250 Self::Combined { combined } => {
251 if !combined.is_done() {
252 combined
253 .fill_buf(acc.as_combined_mut())
254 .await
255 .map_err(|error| ChildFdError::ReadCombined(Arc::new(error)))
256 } else {
257 Ok(())
258 }
259 }
260 }
261 }
262}
263
264pub(crate) enum ChildOutputMut {
266 Split {
268 stdout: Option<BytesMut>,
269 stderr: Option<BytesMut>,
270 },
271 Combined(BytesMut),
273}
274
275impl ChildOutputMut {
276 fn as_split_mut(&mut self) -> (Option<&mut BytesMut>, Option<&mut BytesMut>) {
277 match self {
278 Self::Split { stdout, stderr } => (stdout.as_mut(), stderr.as_mut()),
279 _ => panic!("ChildOutput is not split"),
280 }
281 }
282
283 fn as_combined_mut(&mut self) -> &mut BytesMut {
284 match self {
285 Self::Combined(combined) => combined,
286 _ => panic!("ChildOutput is not combined"),
287 }
288 }
289
290 pub(crate) fn snapshot(&self) -> ChildOutput {
294 match self {
295 Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
296 stdout: stdout.as_ref().map(|x| x.clone().freeze().into()),
297 stderr: stderr.as_ref().map(|x| x.clone().freeze().into()),
298 }),
299 Self::Combined(combined) => ChildOutput::Combined {
300 output: combined.clone().freeze().into(),
301 },
302 }
303 }
304
305 pub(crate) fn freeze(self) -> ChildOutput {
307 match self {
308 Self::Split { stdout, stderr } => ChildOutput::Split(ChildSplitOutput {
309 stdout: stdout.map(|x| x.freeze().into()),
310 stderr: stderr.map(|x| x.freeze().into()),
311 }),
312 Self::Combined(combined) => ChildOutput::Combined {
313 output: combined.freeze().into(),
314 },
315 }
316 }
317}