nextest_runner/runner/
executor.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! The executor for tests.
5//!
6//! This component is responsible for running tests and reporting results to the
7//! dispatcher.
8//!
9//! Note that the executor itself does not communicate directly with the outside
10//! world. All communication is mediated by the dispatcher -- doing so is not
11//! just a better abstraction, it also provides a better user experience (less
12//! inconsistent state).
13
14use super::HandleSignalResult;
15use crate::{
16    config::{
17        EvaluatableProfile, LeakTimeout, LeakTimeoutResult, RetryPolicy, ScriptId,
18        SetupScriptCommand, SetupScriptConfig, SetupScriptExecuteData, SlowTimeout, TestGroup,
19        TestSettings,
20    },
21    double_spawn::DoubleSpawnInfo,
22    errors::{ChildError, ChildFdError, ChildStartError, ErrorList},
23    list::{TestExecuteContext, TestInstance, TestInstanceWithSettings, TestList},
24    reporter::events::{
25        ExecutionResult, FailureStatus, InfoResponse, RetryData, SetupScriptInfoResponse,
26        TestInfoResponse, UnitKind, UnitState,
27    },
28    runner::{
29        ExecutorEvent, InternalExecuteStatus, InternalSetupScriptExecuteStatus,
30        InternalTerminateReason, RunUnitQuery, RunUnitRequest, SignalRequest, UnitExecuteStatus,
31        parse_env_file,
32    },
33    target_runner::TargetRunner,
34    test_command::{ChildAccumulator, ChildFds},
35    test_output::{CaptureStrategy, ChildExecutionOutput, ChildOutput, ChildSplitOutput},
36    time::{PausableSleep, StopwatchStart},
37};
38use future_queue::FutureQueueContext;
39use nextest_metadata::FilterMatch;
40use quick_junit::ReportUuid;
41use rand::{Rng, distr::OpenClosed01};
42use std::{
43    fmt,
44    num::NonZeroUsize,
45    pin::Pin,
46    process::{ExitStatus, Stdio},
47    sync::Arc,
48    time::Duration,
49};
50use tokio::{
51    process::Child,
52    sync::{
53        mpsc::{UnboundedReceiver, UnboundedSender},
54        oneshot,
55    },
56};
57use tracing::{debug, instrument};
58
59#[derive(Debug)]
60pub(super) struct ExecutorContext<'a> {
61    run_id: ReportUuid,
62    profile: &'a EvaluatableProfile<'a>,
63    test_list: &'a TestList<'a>,
64    double_spawn: DoubleSpawnInfo,
65    target_runner: TargetRunner,
66    capture_strategy: CaptureStrategy,
67    // This is Some if the user specifies a retry policy over the command-line.
68    force_retries: Option<RetryPolicy>,
69}
70
71impl<'a> ExecutorContext<'a> {
72    pub(super) fn new(
73        run_id: ReportUuid,
74        profile: &'a EvaluatableProfile<'a>,
75        test_list: &'a TestList<'a>,
76        double_spawn: DoubleSpawnInfo,
77        target_runner: TargetRunner,
78        capture_strategy: CaptureStrategy,
79        force_retries: Option<RetryPolicy>,
80    ) -> Self {
81        Self {
82            run_id,
83            profile,
84            test_list,
85            double_spawn,
86            target_runner,
87            capture_strategy,
88            force_retries,
89        }
90    }
91
92    /// Run scripts, returning data about each successfully executed script.
93    pub(super) async fn run_setup_scripts(
94        &self,
95        resp_tx: UnboundedSender<ExecutorEvent<'a>>,
96    ) -> SetupScriptExecuteData<'a> {
97        let setup_scripts = self.profile.setup_scripts(self.test_list);
98        let total = setup_scripts.len();
99        debug!("running {} setup scripts", total);
100
101        let mut setup_script_data = SetupScriptExecuteData::new();
102
103        // Run setup scripts one by one.
104        for (index, script) in setup_scripts.into_iter().enumerate() {
105            let this_resp_tx = resp_tx.clone();
106
107            let script_id = script.id.clone();
108            let config = script.config;
109            let program = config.command.program(
110                self.test_list.workspace_root(),
111                &self.test_list.rust_build_meta().target_directory,
112            );
113
114            let script_fut = async move {
115                let (req_rx_tx, req_rx_rx) = oneshot::channel();
116                let _ = this_resp_tx.send(ExecutorEvent::SetupScriptStarted {
117                    script_id: script_id.clone(),
118                    config,
119                    program: program.clone(),
120                    index,
121                    total,
122                    req_rx_tx,
123                });
124                let mut req_rx = match req_rx_rx.await {
125                    Ok(req_rx) => req_rx,
126                    Err(_) => {
127                        // The receiver was dropped -- the dispatcher has
128                        // signaled that this unit should exit.
129                        return None;
130                    }
131                };
132
133                let packet = SetupScriptPacket {
134                    script_id: script_id.clone(),
135                    config,
136                    program: program.clone(),
137                };
138
139                let status = self
140                    .run_setup_script(packet, &this_resp_tx, &mut req_rx)
141                    .await;
142
143                // Drain the request receiver, responding to any final requests
144                // that may have been sent.
145                drain_req_rx(req_rx, UnitExecuteStatus::SetupScript(&status));
146
147                let status = status.into_external();
148                let env_map = status.env_map.clone();
149
150                let _ = this_resp_tx.send(ExecutorEvent::SetupScriptFinished {
151                    script_id,
152                    config,
153                    program,
154                    index,
155                    total,
156                    status,
157                });
158
159                env_map.map(|env_map| (script, env_map))
160            };
161
162            // Run this setup script to completion.
163            if let Some((script, env_map)) = script_fut.await {
164                setup_script_data.add_script(script, env_map);
165            }
166        }
167
168        setup_script_data
169    }
170
171    /// Returns a future that runs all attempts of a single test instance.
172    pub(super) async fn run_test_instance(
173        &self,
174        test: TestInstanceWithSettings<'a>,
175        cx: FutureQueueContext,
176        resp_tx: UnboundedSender<ExecutorEvent<'a>>,
177        setup_script_data: Arc<SetupScriptExecuteData<'a>>,
178    ) {
179        debug!(test_name = test.instance.name, "running test");
180
181        let settings = Arc::new(test.settings);
182
183        let retry_policy = self.force_retries.unwrap_or_else(|| settings.retries());
184        let total_attempts = retry_policy.count() + 1;
185        let mut backoff_iter = BackoffIter::new(retry_policy);
186
187        if let FilterMatch::Mismatch { reason } = test.instance.test_info.filter_match {
188            debug_assert!(
189                false,
190                "this test should already have been skipped in a filter step"
191            );
192            // Failure to send means the receiver was dropped.
193            let _ = resp_tx.send(ExecutorEvent::Skipped {
194                test_instance: test.instance,
195                reason,
196            });
197            return;
198        }
199
200        let (req_rx_tx, req_rx_rx) = oneshot::channel();
201
202        // Wait for the Started event to be processed by the
203        // execution future.
204        _ = resp_tx.send(ExecutorEvent::Started {
205            test_instance: test.instance,
206            req_rx_tx,
207        });
208        let mut req_rx = match req_rx_rx.await {
209            Ok(rx) => rx,
210            Err(_) => {
211                // The receiver was dropped -- the dispatcher has signaled that this unit should
212                // exit.
213                return;
214            }
215        };
216
217        let mut attempt = 0;
218        let mut delay = Duration::ZERO;
219        let last_run_status = loop {
220            attempt += 1;
221            let retry_data = RetryData {
222                attempt,
223                total_attempts,
224            };
225
226            if retry_data.attempt > 1 {
227                // Ensure that the dispatcher believes the run is still ongoing.
228                // If the run is cancelled, the dispatcher will let us know by
229                // dropping the receiver.
230                let (tx, rx) = oneshot::channel();
231                _ = resp_tx.send(ExecutorEvent::RetryStarted {
232                    test_instance: test.instance,
233                    retry_data,
234                    tx,
235                });
236
237                match rx.await {
238                    Ok(()) => {}
239                    Err(_) => {
240                        // The receiver was dropped -- the dispatcher has
241                        // signaled that this unit should exit.
242                        return;
243                    }
244                }
245            }
246
247            // Some of this information is only useful for event reporting, but
248            // it's a lot easier to pass it in than to try and hook on
249            // additional information later.
250            let packet = TestPacket {
251                test_instance: test.instance,
252                cx: cx.clone(),
253                retry_data,
254                settings: settings.clone(),
255                setup_script_data: setup_script_data.clone(),
256                delay_before_start: delay,
257            };
258
259            let run_status = self.run_test(packet.clone(), &resp_tx, &mut req_rx).await;
260
261            if run_status.result.is_success() {
262                // The test succeeded.
263                break run_status;
264            } else if retry_data.attempt < retry_data.total_attempts {
265                // Retry this test: send a retry event, then retry the loop.
266                delay = backoff_iter
267                    .next()
268                    .expect("backoff delay must be non-empty");
269
270                let run_status = run_status.into_external();
271                let previous_result = run_status.result;
272                let previous_slow = run_status.is_slow;
273
274                let _ = resp_tx.send(ExecutorEvent::AttemptFailedWillRetry {
275                    test_instance: test.instance,
276                    failure_output: settings.failure_output(),
277                    run_status,
278                    delay_before_next_attempt: delay,
279                });
280
281                handle_delay_between_attempts(
282                    &packet,
283                    previous_result,
284                    previous_slow,
285                    delay,
286                    &mut req_rx,
287                )
288                .await;
289            } else {
290                // This test failed and is out of retries.
291                break run_status;
292            }
293        };
294
295        drain_req_rx(req_rx, UnitExecuteStatus::Test(&last_run_status));
296
297        // At this point, either:
298        // * the test has succeeded, or
299        // * the test has failed and we've run out of retries.
300        // In either case, the test is finished.
301        let last_run_status = last_run_status.into_external();
302        let _ = resp_tx.send(ExecutorEvent::Finished {
303            test_instance: test.instance,
304            success_output: settings.success_output(),
305            failure_output: settings.failure_output(),
306            junit_store_success_output: settings.junit_store_success_output(),
307            junit_store_failure_output: settings.junit_store_failure_output(),
308            last_run_status,
309        });
310    }
311
312    // ---
313    // Helper methods
314    // ---
315
316    /// Run an individual setup script in its own process.
317    #[instrument(level = "debug", skip(self, resp_tx, req_rx))]
318    async fn run_setup_script(
319        &self,
320        script: SetupScriptPacket<'a>,
321        resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
322        req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
323    ) -> InternalSetupScriptExecuteStatus<'a> {
324        let mut stopwatch = crate::time::stopwatch();
325
326        match self
327            .run_setup_script_inner(script.clone(), &mut stopwatch, resp_tx, req_rx)
328            .await
329        {
330            Ok(status) => status,
331            Err(error) => InternalSetupScriptExecuteStatus {
332                script,
333                slow_after: None,
334                output: ChildExecutionOutput::StartError(error),
335                result: ExecutionResult::ExecFail,
336                stopwatch_end: stopwatch.snapshot(),
337                env_map: None,
338            },
339        }
340    }
341
342    #[instrument(level = "debug", skip(self, resp_tx, req_rx))]
343    async fn run_setup_script_inner(
344        &self,
345        script: SetupScriptPacket<'a>,
346        stopwatch: &mut StopwatchStart,
347        resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
348        req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
349    ) -> Result<InternalSetupScriptExecuteStatus<'a>, ChildStartError> {
350        let mut cmd =
351            script.make_command(self.profile.name(), &self.double_spawn, self.test_list)?;
352        let command_mut = cmd.command_mut();
353
354        command_mut.env("NEXTEST_RUN_ID", format!("{}", self.run_id));
355        command_mut.stdin(Stdio::null());
356        super::os::set_process_group(command_mut);
357
358        // If creating a job fails, we might be on an old system. Ignore this -- job objects are a
359        // best-effort thing.
360        let job = super::os::Job::create().ok();
361
362        // The --no-capture CLI argument overrides the config.
363        if self.capture_strategy != CaptureStrategy::None {
364            if script.config.capture_stdout {
365                command_mut.stdout(std::process::Stdio::piped());
366            }
367            if script.config.capture_stderr {
368                command_mut.stderr(std::process::Stdio::piped());
369            }
370        }
371
372        let (mut child, env_path) = cmd
373            .spawn()
374            .map_err(|error| ChildStartError::Spawn(Arc::new(error)))?;
375        let child_pid = child
376            .id()
377            .expect("child has never been polled so must return a PID");
378
379        // If assigning the child to the job fails, ignore this. This can happen if the process has
380        // exited.
381        let _ = super::os::assign_process_to_job(&child, job.as_ref());
382
383        let mut status: Option<ExecutionResult> = None;
384        // Unlike with tests, we don't automatically assume setup scripts are slow if they take a
385        // long time. For example, consider a setup script that performs a cargo build -- it can
386        // take an indeterminate amount of time. That's why we set a very large slow timeout rather
387        // than the test default of 60 seconds.
388        let slow_timeout = script
389            .config
390            .slow_timeout
391            .unwrap_or(SlowTimeout::VERY_LARGE);
392        let leak_timeout = script.config.leak_timeout.unwrap_or_default();
393
394        let mut interval_sleep = std::pin::pin!(crate::time::pausable_sleep(slow_timeout.period));
395
396        let mut timeout_hit = 0;
397
398        let child_fds = ChildFds::new_split(child.stdout.take(), child.stderr.take());
399        let mut child_acc = ChildAccumulator::new(child_fds);
400
401        let mut cx = UnitContext {
402            packet: UnitPacket::SetupScript(script.clone()),
403            slow_after: None,
404        };
405
406        let (res, leaked) = {
407            let res = loop {
408                tokio::select! {
409                    () = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
410                    res = child.wait() => {
411                        // The setup script finished executing.
412                        break res;
413                    }
414                    _ = &mut interval_sleep, if status.is_none() => {
415                        // Mark the script as slow.
416                        cx.slow_after = Some(slow_timeout.period);
417
418                        timeout_hit += 1;
419                        let will_terminate = if let Some(terminate_after) = slow_timeout.terminate_after {
420                            NonZeroUsize::new(timeout_hit as usize)
421                                .expect("timeout_hit was just incremented")
422                                >= terminate_after
423                        } else {
424                            false
425                        };
426
427                        if !slow_timeout.grace_period.is_zero() {
428                            let _ = resp_tx.send(script.slow_event(
429                                // Pass in the slow timeout period times timeout_hit, since
430                                // stopwatch.elapsed() tends to be slightly longer.
431                                timeout_hit * slow_timeout.period,
432                                will_terminate.then_some(slow_timeout.grace_period),
433                            ));
434                        }
435
436                        if will_terminate {
437                            // Attempt to terminate the slow script. As there is
438                            // a race between shutting down a slow test and its
439                            // own completion, we silently ignore errors to
440                            // avoid printing false warnings.
441                            //
442                            // The return result of terminate_child is not used
443                            // here, since it is always marked as a timeout.
444                            _ = super::os::terminate_child(
445                                &cx,
446                                &mut child,
447                                &mut child_acc,
448                                InternalTerminateReason::Timeout,
449                                stopwatch,
450                                req_rx,
451                                job.as_ref(),
452                                slow_timeout.grace_period,
453                            ).await;
454                            status = Some(ExecutionResult::Timeout);
455                            if slow_timeout.grace_period.is_zero() {
456                                break child.wait().await;
457                            }
458                            // Don't break here to give the wait task a chance to finish.
459                        } else {
460                            interval_sleep.as_mut().reset_last_duration();
461                        }
462                    }
463                    recv = req_rx.recv() => {
464                        // The sender stays open longer than the whole loop, and the buffer is big
465                        // enough for all messages ever sent through this channel, so a RecvError
466                        // should never happen.
467                        let req = recv.expect("a RecvError should never happen here");
468
469                        match req {
470                            RunUnitRequest::Signal(req) => {
471                                #[cfg_attr(not(windows), expect(unused_variables))]
472                                let res = handle_signal_request(
473                                    &cx,
474                                    &mut child,
475                                    &mut child_acc,
476                                    req,
477                                    stopwatch,
478                                    interval_sleep.as_mut(),
479                                    req_rx,
480                                    job.as_ref(),
481                                    slow_timeout.grace_period
482                                ).await;
483
484                                // On Unix, the signal the process exited with
485                                // will be picked up by child.wait. On Windows,
486                                // termination by job object will show up as
487                                // exit code 1 -- we need to be clearer about
488                                // that in the UI.
489                                //
490                                // TODO: Can we do something useful with res on
491                                // Unix? For example, it's possible that the
492                                // signal we send is not the same as the signal
493                                // the child exits with. This might be a good
494                                // thing to store in whatever test event log we
495                                // end up building.
496                                #[cfg(windows)]
497                                {
498                                    if matches!(
499                                        res,
500                                        HandleSignalResult::Terminated(super::TerminateChildResult::Killed)
501                                    ) {
502                                        status = Some(ExecutionResult::Fail {
503                                            failure_status: FailureStatus::Abort(
504                                                crate::reporter::events::AbortStatus::JobObject,
505                                            ),
506                                            leaked: false,
507                                        });
508                                    }
509                                }
510                            }
511                            RunUnitRequest::OtherCancel => {
512                                // Ignore non-signal cancellation requests --
513                                // let the script finish.
514                            }
515                            RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => {
516                                _ = sender.send(script.info_response(
517                                    UnitState::Running {
518                                        pid: child_pid,
519                                        time_taken:             stopwatch.snapshot().active,
520                                        slow_after: cx.slow_after,
521                                    },
522                                    child_acc.snapshot_in_progress(UnitKind::WAITING_ON_SCRIPT_MESSAGE),
523                                ));
524                            }
525                        }
526                    }
527                }
528            };
529
530            // Build a tentative status using status and the exit status.
531            let tentative_status = status.or_else(|| {
532                res.as_ref().ok().map(|res| {
533                    create_execution_result(*res, &child_acc.errors, false, LeakTimeoutResult::Pass)
534                })
535            });
536
537            let leaked = detect_fd_leaks(
538                &cx,
539                child_pid,
540                &mut child_acc,
541                tentative_status,
542                leak_timeout,
543                stopwatch,
544                req_rx,
545            )
546            .await;
547
548            (res, leaked)
549        };
550
551        let exit_status = match res {
552            Ok(exit_status) => Some(exit_status),
553            Err(err) => {
554                child_acc.errors.push(ChildFdError::Wait(Arc::new(err)));
555                None
556            }
557        };
558
559        let exit_status = exit_status.expect("None always results in early return");
560
561        let exec_result = status.unwrap_or_else(|| {
562            create_execution_result(exit_status, &child_acc.errors, leaked, leak_timeout.result)
563        });
564
565        // Read from the environment map. If there's an error here, add it to the list of child errors.
566        let mut errors: Vec<_> = child_acc.errors.into_iter().map(ChildError::from).collect();
567        let env_map = if exec_result.is_success() {
568            match parse_env_file(&env_path).await {
569                Ok(env_map) => Some(env_map),
570                Err(error) => {
571                    errors.push(ChildError::SetupScriptOutput(error));
572                    None
573                }
574            }
575        } else {
576            None
577        };
578
579        Ok(InternalSetupScriptExecuteStatus {
580            script,
581            slow_after: cx.slow_after,
582            output: ChildExecutionOutput::Output {
583                result: Some(exec_result),
584                output: child_acc.output.freeze(),
585                errors: ErrorList::new(UnitKind::WAITING_ON_SCRIPT_MESSAGE, errors),
586            },
587            result: exec_result,
588            stopwatch_end: stopwatch.snapshot(),
589            env_map,
590        })
591    }
592
593    /// Run an individual test in its own process.
594    #[instrument(level = "debug", skip(self, resp_tx, req_rx))]
595    async fn run_test(
596        &self,
597        test: TestPacket<'a>,
598        resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
599        req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
600    ) -> InternalExecuteStatus<'a> {
601        let mut stopwatch = crate::time::stopwatch();
602
603        match self
604            .run_test_inner(test.clone(), &mut stopwatch, resp_tx, req_rx)
605            .await
606        {
607            Ok(run_status) => run_status,
608            Err(error) => InternalExecuteStatus {
609                test,
610                slow_after: None,
611                output: ChildExecutionOutput::StartError(error),
612                result: ExecutionResult::ExecFail,
613                stopwatch_end: stopwatch.snapshot(),
614            },
615        }
616    }
617
618    async fn run_test_inner(
619        &self,
620        test: TestPacket<'a>,
621        stopwatch: &mut StopwatchStart,
622        resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
623        req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
624    ) -> Result<InternalExecuteStatus<'a>, ChildStartError> {
625        let ctx = TestExecuteContext {
626            profile_name: self.profile.name(),
627            double_spawn: &self.double_spawn,
628            target_runner: &self.target_runner,
629        };
630        let mut cmd = test.test_instance.make_command(
631            &ctx,
632            self.test_list,
633            test.settings.run_wrapper(),
634            test.settings.run_extra_args(),
635        );
636        let command_mut = cmd.command_mut();
637
638        // Debug environment variable for testing.
639        command_mut.env("__NEXTEST_ATTEMPT", format!("{}", test.retry_data.attempt));
640
641        command_mut.env("NEXTEST_RUN_ID", format!("{}", self.run_id));
642
643        // Set group and slot environment variables.
644        command_mut.env(
645            "NEXTEST_TEST_GLOBAL_SLOT",
646            test.cx.global_slot().to_string(),
647        );
648        match test.settings.test_group() {
649            TestGroup::Custom(name) => {
650                debug_assert!(
651                    test.cx.group_slot().is_some(),
652                    "test_group being set implies group_slot is set"
653                );
654                command_mut.env("NEXTEST_TEST_GROUP", name.as_str());
655            }
656            TestGroup::Global => {
657                debug_assert!(
658                    test.cx.group_slot().is_none(),
659                    "test_group being unset implies group_slot is unset"
660                );
661                command_mut.env("NEXTEST_TEST_GROUP", TestGroup::GLOBAL_STR);
662            }
663        }
664        if let Some(group_slot) = test.cx.group_slot() {
665            command_mut.env("NEXTEST_TEST_GROUP_SLOT", group_slot.to_string());
666        } else {
667            command_mut.env("NEXTEST_TEST_GROUP_SLOT", "none");
668        }
669
670        command_mut.stdin(Stdio::null());
671        test.setup_script_data.apply(
672            &test.test_instance.to_test_query(),
673            &self.profile.filterset_ecx(),
674            command_mut,
675        );
676        super::os::set_process_group(command_mut);
677
678        // If creating a job fails, we might be on an old system. Ignore this -- job objects are a
679        // best-effort thing.
680        let job = super::os::Job::create().ok();
681
682        let crate::test_command::Child {
683            mut child,
684            child_fds,
685        } = cmd
686            .spawn(self.capture_strategy)
687            .map_err(|error| ChildStartError::Spawn(Arc::new(error)))?;
688
689        // Note: The PID stored here must be used with care -- it might be
690        // outdated and have been reused by the kernel in case the process
691        // has exited. If using for any real logic (not just reporting) it
692        // might be best to always check child.id().
693        let child_pid = child
694            .id()
695            .expect("child has never been polled so must return a PID");
696
697        // If assigning the child to the job fails, ignore this. This can happen if the process has
698        // exited.
699        let _ = super::os::assign_process_to_job(&child, job.as_ref());
700
701        let mut child_acc = ChildAccumulator::new(child_fds);
702
703        let mut status: Option<ExecutionResult> = None;
704        let slow_timeout = test.settings.slow_timeout();
705        let leak_timeout = test.settings.leak_timeout();
706
707        // Use a pausable_sleep rather than an interval here because it's much
708        // harder to pause and resume an interval.
709        let mut interval_sleep = std::pin::pin!(crate::time::pausable_sleep(slow_timeout.period));
710
711        let mut timeout_hit = 0;
712
713        let mut cx = UnitContext {
714            packet: UnitPacket::Test(test.clone()),
715            slow_after: None,
716        };
717
718        let (res, leaked) = {
719            let res = loop {
720                tokio::select! {
721                    () = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
722                    res = child.wait() => {
723                        // The test finished executing.
724                        break res;
725                    }
726                    _ = &mut interval_sleep, if status.is_none() => {
727                        // Mark the test as slow.
728                        cx.slow_after = Some(slow_timeout.period);
729
730                        timeout_hit += 1;
731                        let will_terminate = if let Some(terminate_after) = slow_timeout.terminate_after {
732                            NonZeroUsize::new(timeout_hit as usize)
733                                .expect("timeout_hit was just incremented")
734                                >= terminate_after
735                        } else {
736                            false
737                        };
738
739                        if !slow_timeout.grace_period.is_zero() {
740                            let _ = resp_tx.send(test.slow_event(
741                                // Pass in the slow timeout period times timeout_hit, since
742                                // stopwatch.elapsed() tends to be slightly longer.
743                                timeout_hit * slow_timeout.period,
744                                will_terminate.then_some(slow_timeout.grace_period),
745                            ));
746                        }
747
748                        if will_terminate {
749                            // Attempt to terminate the slow test. As there is a
750                            // race between shutting down a slow test and its
751                            // own completion, we silently ignore errors to
752                            // avoid printing false warnings.
753                            //
754                            // The return result of terminate_child is not used
755                            // here, since it is always marked as a timeout.
756                            _ = super::os::terminate_child(
757                                &cx,
758                                &mut child,
759                                &mut child_acc,
760                                InternalTerminateReason::Timeout,
761                                stopwatch,
762                                req_rx,
763                                job.as_ref(),
764                                slow_timeout.grace_period,
765                            ).await;
766                            status = Some(ExecutionResult::Timeout);
767                            if slow_timeout.grace_period.is_zero() {
768                                break child.wait().await;
769                            }
770                            // Don't break here to give the wait task a chance to finish.
771                        } else {
772                            interval_sleep.as_mut().reset_last_duration();
773                        }
774                    }
775                    recv = req_rx.recv() => {
776                        // The sender stays open longer than the whole loop so a
777                        // RecvError should never happen.
778                        let req = recv.expect("req_rx sender is open");
779
780                        match req {
781                            RunUnitRequest::Signal(req) => {
782                                #[cfg_attr(not(windows), expect(unused_variables))]
783                                let res = handle_signal_request(
784                                    &cx,
785                                    &mut child,
786                                    &mut child_acc,
787                                    req,
788                                    stopwatch,
789                                    interval_sleep.as_mut(),
790                                    req_rx,
791                                    job.as_ref(),
792                                    slow_timeout.grace_period,
793                                ).await;
794
795                                // On Unix, the signal the process exited with
796                                // will be picked up by child.wait. On Windows,
797                                // termination by job object will show up as
798                                // exit code 1 -- we need to be clearer about
799                                // that in the UI.
800                                //
801                                // TODO: Can we do something useful with res on
802                                // Unix? For example, it's possible that the
803                                // signal we send is not the same as the signal
804                                // the child exits with. This might be a good
805                                // thing to store in whatever test event log we
806                                // end up building.
807                                #[cfg(windows)]
808                                {
809                                    if matches!(
810                                        res,
811                                        HandleSignalResult::Terminated(super::TerminateChildResult::Killed)
812                                    ) {
813                                        status = Some(ExecutionResult::Fail {
814                                            failure_status: FailureStatus::Abort(
815                                                crate::reporter::events::AbortStatus::JobObject,
816                                            ),
817                                            leaked: false,
818                                        });
819                                    }
820                                }
821                            }
822                            RunUnitRequest::OtherCancel => {
823                                // Ignore non-signal cancellation requests --
824                                // let the test finish.
825                            }
826                            RunUnitRequest::Query(RunUnitQuery::GetInfo(tx)) => {
827                                _ = tx.send(test.info_response(
828                                    UnitState::Running {
829                                        pid: child_pid,
830                                        time_taken: stopwatch.snapshot().active,
831                                        slow_after: cx.slow_after,
832                                    },
833                                    child_acc.snapshot_in_progress(UnitKind::WAITING_ON_TEST_MESSAGE),
834                                ));
835                            }
836                        }
837                    }
838                };
839            };
840
841            // Build a tentative status using status and the exit status.
842            let tentative_status = status.or_else(|| {
843                res.as_ref().ok().map(|res| {
844                    create_execution_result(*res, &child_acc.errors, false, LeakTimeoutResult::Pass)
845                })
846            });
847
848            let leaked = detect_fd_leaks(
849                &cx,
850                child_pid,
851                &mut child_acc,
852                tentative_status,
853                leak_timeout,
854                stopwatch,
855                req_rx,
856            )
857            .await;
858
859            (res, leaked)
860        };
861
862        let exit_status = match res {
863            Ok(exit_status) => Some(exit_status),
864            Err(err) => {
865                child_acc.errors.push(ChildFdError::Wait(Arc::new(err)));
866                None
867            }
868        };
869
870        let exit_status = exit_status.expect("None always results in early return");
871        let exec_result = status.unwrap_or_else(|| {
872            create_execution_result(exit_status, &child_acc.errors, leaked, leak_timeout.result)
873        });
874
875        Ok(InternalExecuteStatus {
876            test,
877            slow_after: cx.slow_after,
878            output: ChildExecutionOutput::Output {
879                result: Some(exec_result),
880                output: child_acc.output.freeze(),
881                errors: ErrorList::new(UnitKind::WAITING_ON_TEST_MESSAGE, child_acc.errors),
882            },
883            result: exec_result,
884            stopwatch_end: stopwatch.snapshot(),
885        })
886    }
887}
888
889#[derive(Debug)]
890struct BackoffIter {
891    policy: RetryPolicy,
892    current_factor: f64,
893    remaining_attempts: usize,
894}
895
896impl BackoffIter {
897    const BACKOFF_EXPONENT: f64 = 2.;
898
899    fn new(policy: RetryPolicy) -> Self {
900        let remaining_attempts = policy.count();
901        Self {
902            policy,
903            current_factor: 1.,
904            remaining_attempts,
905        }
906    }
907
908    fn next_delay_and_jitter(&mut self) -> (Duration, bool) {
909        match self.policy {
910            RetryPolicy::Fixed { delay, jitter, .. } => (delay, jitter),
911            RetryPolicy::Exponential {
912                delay,
913                jitter,
914                max_delay,
915                ..
916            } => {
917                let factor = self.current_factor;
918                let exp_delay = delay.mul_f64(factor);
919
920                // Stop multiplying the exponential factor if delay is greater than max_delay.
921                if let Some(max_delay) = max_delay {
922                    if exp_delay > max_delay {
923                        return (max_delay, jitter);
924                    }
925                }
926
927                let next_factor = self.current_factor * Self::BACKOFF_EXPONENT;
928                self.current_factor = next_factor;
929
930                (exp_delay, jitter)
931            }
932        }
933    }
934
935    fn apply_jitter(duration: Duration) -> Duration {
936        let jitter: f64 = rand::rng().sample(OpenClosed01);
937        // Apply jitter in the range (0.5, 1].
938        duration.mul_f64(0.5 + jitter / 2.)
939    }
940}
941
942impl Iterator for BackoffIter {
943    type Item = Duration;
944    fn next(&mut self) -> Option<Self::Item> {
945        if self.remaining_attempts > 0 {
946            let (mut delay, jitter) = self.next_delay_and_jitter();
947            if jitter {
948                delay = Self::apply_jitter(delay);
949            }
950            self.remaining_attempts -= 1;
951            Some(delay)
952        } else {
953            None
954        }
955    }
956}
957
958/// Either a test or a setup script, along with information about how long the
959/// test took.
960pub(super) struct UnitContext<'a> {
961    packet: UnitPacket<'a>,
962    // TODO: This is a bit of a mess. It isn't clear where this kind of state
963    // should live -- many parts of the request-response system need various
964    // pieces of this code.
965    slow_after: Option<Duration>,
966}
967
968impl<'a> UnitContext<'a> {
969    pub(super) fn packet(&self) -> &UnitPacket<'a> {
970        &self.packet
971    }
972
973    pub(super) fn info_response(
974        &self,
975        state: UnitState,
976        output: ChildExecutionOutput,
977    ) -> InfoResponse<'a> {
978        match &self.packet {
979            UnitPacket::SetupScript(packet) => packet.info_response(state, output),
980            UnitPacket::Test(packet) => packet.info_response(state, output),
981        }
982    }
983}
984
985#[derive(Clone, Debug)]
986pub(super) enum UnitPacket<'a> {
987    SetupScript(SetupScriptPacket<'a>),
988    Test(TestPacket<'a>),
989}
990
991impl UnitPacket<'_> {
992    pub(super) fn kind(&self) -> UnitKind {
993        match self {
994            Self::SetupScript(_) => UnitKind::Script,
995            Self::Test(_) => UnitKind::Test,
996        }
997    }
998}
999
1000#[derive(Clone)]
1001pub(super) struct TestPacket<'a> {
1002    test_instance: TestInstance<'a>,
1003    cx: FutureQueueContext,
1004    retry_data: RetryData,
1005    settings: Arc<TestSettings<'a>>,
1006    setup_script_data: Arc<SetupScriptExecuteData<'a>>,
1007    delay_before_start: Duration,
1008}
1009
1010impl<'a> TestPacket<'a> {
1011    fn slow_event(&self, elapsed: Duration, will_terminate: Option<Duration>) -> ExecutorEvent<'a> {
1012        ExecutorEvent::Slow {
1013            test_instance: self.test_instance,
1014            retry_data: self.retry_data,
1015            elapsed,
1016            will_terminate,
1017        }
1018    }
1019
1020    pub(super) fn retry_data(&self) -> RetryData {
1021        self.retry_data
1022    }
1023
1024    pub(super) fn delay_before_start(&self) -> Duration {
1025        self.delay_before_start
1026    }
1027
1028    pub(super) fn info_response(
1029        &self,
1030        state: UnitState,
1031        output: ChildExecutionOutput,
1032    ) -> InfoResponse<'a> {
1033        InfoResponse::Test(TestInfoResponse {
1034            test_instance: self.test_instance.id(),
1035            state,
1036            retry_data: self.retry_data,
1037            output,
1038        })
1039    }
1040}
1041
1042impl fmt::Debug for TestPacket<'_> {
1043    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1044        f.debug_struct("TestPacket")
1045            .field("test_instance", &self.test_instance.id())
1046            .field("cx", &self.cx)
1047            .finish_non_exhaustive()
1048    }
1049}
1050
1051#[derive(Clone, Debug)]
1052pub(super) struct SetupScriptPacket<'a> {
1053    script_id: ScriptId,
1054    config: &'a SetupScriptConfig,
1055    program: String,
1056}
1057
1058impl<'a> SetupScriptPacket<'a> {
1059    /// Turns self into a command that can be executed.
1060    fn make_command(
1061        &self,
1062        profile_name: &str,
1063        double_spawn: &DoubleSpawnInfo,
1064        test_list: &TestList<'_>,
1065    ) -> Result<SetupScriptCommand, ChildStartError> {
1066        SetupScriptCommand::new(self.config, profile_name, double_spawn, test_list)
1067    }
1068
1069    fn slow_event(&self, elapsed: Duration, will_terminate: Option<Duration>) -> ExecutorEvent<'a> {
1070        ExecutorEvent::SetupScriptSlow {
1071            script_id: self.script_id.clone(),
1072            config: self.config,
1073            program: self.program.clone(),
1074            elapsed,
1075            will_terminate,
1076        }
1077    }
1078
1079    pub(super) fn info_response(
1080        &self,
1081        state: UnitState,
1082        output: ChildExecutionOutput,
1083    ) -> InfoResponse<'a> {
1084        InfoResponse::SetupScript(SetupScriptInfoResponse {
1085            script_id: self.script_id.clone(),
1086            program: self.program.clone(),
1087            args: &self.config.command.args,
1088            state,
1089            output,
1090        })
1091    }
1092}
1093
1094/// Drains the request receiver of any messages.
1095fn drain_req_rx<'a>(
1096    mut receiver: UnboundedReceiver<RunUnitRequest<'a>>,
1097    status: UnitExecuteStatus<'a, '_>,
1098) {
1099    // Mark the receiver closed so no further messages are sent.
1100    receiver.close();
1101    loop {
1102        // Receive anything that's left in the receiver.
1103        let message = receiver.try_recv();
1104        match message {
1105            Ok(message) => {
1106                message.drain(status);
1107            }
1108            Err(_) => {
1109                break;
1110            }
1111        }
1112    }
1113}
1114
1115async fn handle_delay_between_attempts<'a>(
1116    packet: &TestPacket<'a>,
1117    previous_result: ExecutionResult,
1118    previous_slow: bool,
1119    delay: Duration,
1120    req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
1121) {
1122    let mut sleep = std::pin::pin!(crate::time::pausable_sleep(delay));
1123    #[cfg_attr(not(unix), expect(unused_mut))]
1124    let mut waiting_stopwatch = crate::time::stopwatch();
1125
1126    loop {
1127        tokio::select! {
1128            _ = &mut sleep => {
1129                // The timer has expired.
1130                break;
1131            }
1132            recv = req_rx.recv() => {
1133                let req = recv.expect("req_rx sender is open");
1134
1135                match req {
1136                    #[cfg(unix)]
1137                    RunUnitRequest::Signal(SignalRequest::Stop(tx)) => {
1138                        sleep.as_mut().pause();
1139                        waiting_stopwatch.pause();
1140                        _ = tx.send(());
1141                    }
1142                    #[cfg(unix)]
1143                    RunUnitRequest::Signal(SignalRequest::Continue) => {
1144                        if sleep.is_paused() {
1145                            sleep.as_mut().resume();
1146                            waiting_stopwatch.resume();
1147                        }
1148                    }
1149                    RunUnitRequest::Signal(SignalRequest::Shutdown(_)) => {
1150                        // The run was cancelled, so go ahead and perform a
1151                        // shutdown.
1152                        break;
1153                    }
1154                    RunUnitRequest::OtherCancel => {
1155                        // If a cancellation was requested, break out of the
1156                        // loop.
1157                        break;
1158                    }
1159                    RunUnitRequest::Query(RunUnitQuery::GetInfo(tx)) => {
1160                        let waiting_snapshot = waiting_stopwatch.snapshot();
1161                        _ = tx.send(
1162                            packet.info_response(
1163                                UnitState::DelayBeforeNextAttempt {
1164                                    previous_result,
1165                                    previous_slow,
1166                                    waiting_duration: waiting_snapshot.active,
1167                                    remaining: delay
1168                                        .checked_sub(waiting_snapshot.active)
1169                                        .unwrap_or_default(),
1170                                },
1171                                // This field is ignored but our data model
1172                                // requires it.
1173                                ChildExecutionOutput::Output {
1174                                    result: None,
1175                                    output: ChildOutput::Split(ChildSplitOutput {
1176                                        stdout: None,
1177                                        stderr: None,
1178                                    }),
1179                                    errors: None,
1180                                },
1181                            ),
1182                        );
1183                    }
1184                }
1185            }
1186        }
1187    }
1188}
1189
1190/// After a child process has exited, detect if it leaked file handles by
1191/// leaving long-running grandchildren open.
1192///
1193/// This is done by waiting for a short period of time after the child has
1194/// exited, and checking if stdout and stderr are still open. In the future, we
1195/// could do more sophisticated checks around e.g. if any processes with the
1196/// same PGID are around.
1197async fn detect_fd_leaks<'a>(
1198    cx: &UnitContext<'a>,
1199    child_pid: u32,
1200    child_acc: &mut ChildAccumulator,
1201    tentative_result: Option<ExecutionResult>,
1202    leak_timeout: LeakTimeout,
1203    stopwatch: &mut StopwatchStart,
1204    req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
1205) -> bool {
1206    loop {
1207        // Ignore stop and continue events here since the leak timeout should be very small.
1208        // TODO: we may want to consider them.
1209        let mut sleep = std::pin::pin!(tokio::time::sleep(leak_timeout.period));
1210        let waiting_stopwatch = crate::time::stopwatch();
1211
1212        tokio::select! {
1213            // All of the branches here need to check for
1214            // `!child_acc.fds.is_done()`, because if child_fds is done we want
1215            // to hit the `else` block right away.
1216            () = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
1217            () = &mut sleep, if !child_acc.fds.is_done() => {
1218                break true;
1219            }
1220            recv = req_rx.recv(), if !child_acc.fds.is_done() => {
1221                // The sender stays open longer than the whole loop, and the
1222                // buffer is big enough for all messages ever sent through this
1223                // channel, so a RecvError should never happen.
1224                let req = recv.expect("a RecvError should never happen here");
1225
1226                match req {
1227                    RunUnitRequest::Signal(_) => {
1228                        // The process is done executing, so signals are moot.
1229                    }
1230                    RunUnitRequest::OtherCancel => {
1231                        // Ignore non-signal cancellation requests -- let the
1232                        // unit finish.
1233                    }
1234                    RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => {
1235                        let snapshot = waiting_stopwatch.snapshot();
1236                        let resp = cx.info_response(
1237                            UnitState::Exiting {
1238                                // Because we've polled that the child is done,
1239                                // child.id() will likely return None at this
1240                                // point. Use the cached PID since this is just
1241                                // for reporting.
1242                                pid: child_pid,
1243                                time_taken: stopwatch.snapshot().active,
1244                                slow_after: cx.slow_after,
1245                                tentative_result,
1246                                waiting_duration: snapshot.active,
1247                                remaining: leak_timeout.period
1248                                    .checked_sub(snapshot.active)
1249                                    .unwrap_or_default(),
1250                            },
1251                            child_acc.snapshot_in_progress(cx.packet.kind().waiting_on_message()),
1252                        );
1253
1254                        _ = sender.send(resp);
1255                    }
1256                }
1257            }
1258            else => {
1259                break false;
1260            }
1261        }
1262    }
1263}
1264
1265// It would be nice to fix this function to not have so many arguments, but this
1266// code is actively being refactored right now and imposing too much structure
1267// can cause more harm than good.
1268#[expect(clippy::too_many_arguments)]
1269async fn handle_signal_request<'a>(
1270    cx: &UnitContext<'a>,
1271    child: &mut Child,
1272    child_acc: &mut ChildAccumulator,
1273    req: SignalRequest,
1274    stopwatch: &mut StopwatchStart,
1275    #[cfg_attr(not(unix), expect(unused_mut, unused_variables))] mut interval_sleep: Pin<
1276        &mut PausableSleep,
1277    >,
1278    req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
1279    job: Option<&super::os::Job>,
1280    grace_period: Duration,
1281) -> HandleSignalResult {
1282    match req {
1283        #[cfg(unix)]
1284        SignalRequest::Stop(sender) => {
1285            // It isn't possible to receive a stop event twice since it gets
1286            // debounced in the main signal handler.
1287            stopwatch.pause();
1288            interval_sleep.as_mut().pause();
1289            super::os::job_control_child(child, crate::signal::JobControlEvent::Stop);
1290            // The receiver being dead probably means the main thread panicked
1291            // or similar.
1292            let _ = sender.send(());
1293            HandleSignalResult::JobControl
1294        }
1295        #[cfg(unix)]
1296        SignalRequest::Continue => {
1297            // It's possible to receive a resume event right at the beginning of
1298            // test execution, so debounce it.
1299            if stopwatch.is_paused() {
1300                stopwatch.resume();
1301                interval_sleep.as_mut().resume();
1302                super::os::job_control_child(child, crate::signal::JobControlEvent::Continue);
1303            }
1304            HandleSignalResult::JobControl
1305        }
1306        SignalRequest::Shutdown(event) => {
1307            let res = super::os::terminate_child(
1308                cx,
1309                child,
1310                child_acc,
1311                InternalTerminateReason::Signal(event),
1312                stopwatch,
1313                req_rx,
1314                job,
1315                grace_period,
1316            )
1317            .await;
1318            HandleSignalResult::Terminated(res)
1319        }
1320    }
1321}
1322
1323fn create_execution_result(
1324    exit_status: ExitStatus,
1325    child_errors: &[ChildFdError],
1326    leaked: bool,
1327    leak_timeout_result: LeakTimeoutResult,
1328) -> ExecutionResult {
1329    if !child_errors.is_empty() {
1330        // If an error occurred while waiting on the child handles, treat it as
1331        // an execution failure.
1332        ExecutionResult::ExecFail
1333    } else if exit_status.success() {
1334        if leaked {
1335            // Note: this is test passed (exited with code 0) + leaked handles,
1336            // not test failed and also leaked handles.
1337            ExecutionResult::Leak {
1338                result: leak_timeout_result,
1339            }
1340        } else {
1341            ExecutionResult::Pass
1342        }
1343    } else {
1344        ExecutionResult::Fail {
1345            failure_status: FailureStatus::extract(exit_status),
1346            leaked,
1347        }
1348    }
1349}