nextest_runner/runner/
executor.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! The executor for tests.
5//!
6//! This component is responsible for running tests and reporting results to the
7//! dispatcher.
8//!
9//! Note that the executor itself does not communicate directly with the outside
10//! world. All communication is mediated by the dispatcher -- doing so is not
11//! just a better abstraction, it also provides a better user experience (less
12//! inconsistent state).
13
14use super::HandleSignalResult;
15use crate::{
16    config::{
17        core::EvaluatableProfile,
18        elements::{LeakTimeout, LeakTimeoutResult, RetryPolicy, SlowTimeout, TestGroup},
19        overrides::TestSettings,
20        scripts::{ScriptId, SetupScriptCommand, SetupScriptConfig, SetupScriptExecuteData},
21    },
22    double_spawn::DoubleSpawnInfo,
23    errors::{ChildError, ChildFdError, ChildStartError, ErrorList},
24    list::{TestExecuteContext, TestInstance, TestInstanceWithSettings, TestList},
25    reporter::events::{
26        ExecutionResult, FailureStatus, InfoResponse, RetryData, SetupScriptInfoResponse,
27        StressIndex, TestInfoResponse, UnitKind, UnitState,
28    },
29    runner::{
30        ExecutorEvent, InternalExecuteStatus, InternalSetupScriptExecuteStatus,
31        InternalTerminateReason, RunUnitQuery, RunUnitRequest, SignalRequest, UnitExecuteStatus,
32        parse_env_file,
33    },
34    target_runner::TargetRunner,
35    test_command::{ChildAccumulator, ChildFds},
36    test_output::{CaptureStrategy, ChildExecutionOutput, ChildOutput, ChildSplitOutput},
37    time::{PausableSleep, StopwatchStart},
38};
39use future_queue::FutureQueueContext;
40use nextest_metadata::FilterMatch;
41use quick_junit::ReportUuid;
42use rand::{Rng, distr::OpenClosed01};
43use std::{
44    fmt,
45    num::NonZeroUsize,
46    pin::Pin,
47    process::{ExitStatus, Stdio},
48    sync::Arc,
49    time::Duration,
50};
51use tokio::{
52    process::Child,
53    sync::{
54        mpsc::{UnboundedReceiver, UnboundedSender},
55        oneshot,
56    },
57};
58use tracing::{debug, instrument};
59
60#[derive(Debug)]
61pub(super) struct ExecutorContext<'a> {
62    run_id: ReportUuid,
63    profile: &'a EvaluatableProfile<'a>,
64    test_list: &'a TestList<'a>,
65    double_spawn: DoubleSpawnInfo,
66    target_runner: TargetRunner,
67    capture_strategy: CaptureStrategy,
68    // This is Some if the user specifies a retry policy over the command-line.
69    force_retries: Option<RetryPolicy>,
70}
71
72impl<'a> ExecutorContext<'a> {
73    pub(super) fn new(
74        run_id: ReportUuid,
75        profile: &'a EvaluatableProfile<'a>,
76        test_list: &'a TestList<'a>,
77        double_spawn: DoubleSpawnInfo,
78        target_runner: TargetRunner,
79        capture_strategy: CaptureStrategy,
80        force_retries: Option<RetryPolicy>,
81    ) -> Self {
82        Self {
83            run_id,
84            profile,
85            test_list,
86            double_spawn,
87            target_runner,
88            capture_strategy,
89            force_retries,
90        }
91    }
92
93    /// Run scripts, returning data about each successfully executed script.
94    pub(super) async fn run_setup_scripts(
95        &self,
96        stress_index: Option<StressIndex>,
97        resp_tx: UnboundedSender<ExecutorEvent<'a>>,
98    ) -> SetupScriptExecuteData<'a> {
99        let setup_scripts = self.profile.setup_scripts(self.test_list);
100        let total = setup_scripts.len();
101        debug!("running {} setup scripts", total);
102
103        let mut setup_script_data = SetupScriptExecuteData::new();
104
105        // Run setup scripts one by one.
106        for (index, script) in setup_scripts.into_iter().enumerate() {
107            let this_resp_tx = resp_tx.clone();
108
109            let script_id = script.id.clone();
110            let config = script.config;
111            let program = config.command.program(
112                self.test_list.workspace_root(),
113                &self.test_list.rust_build_meta().target_directory,
114            );
115
116            let script_fut = async move {
117                let (req_rx_tx, req_rx_rx) = oneshot::channel();
118                let _ = this_resp_tx.send(ExecutorEvent::SetupScriptStarted {
119                    stress_index,
120                    script_id: script_id.clone(),
121                    config,
122                    program: program.clone(),
123                    index,
124                    total,
125                    req_rx_tx,
126                });
127                let mut req_rx = match req_rx_rx.await {
128                    Ok(req_rx) => req_rx,
129                    Err(_) => {
130                        // The receiver was dropped -- the dispatcher has
131                        // signaled that this unit should exit.
132                        return None;
133                    }
134                };
135
136                let packet = SetupScriptPacket {
137                    stress_index,
138                    script_id: script_id.clone(),
139                    config,
140                    program: program.clone(),
141                };
142
143                let status = self
144                    .run_setup_script(packet, &this_resp_tx, &mut req_rx)
145                    .await;
146
147                // Drain the request receiver, responding to any final requests
148                // that may have been sent.
149                drain_req_rx(req_rx, UnitExecuteStatus::SetupScript(&status));
150
151                let status = status.into_external();
152                let env_map = status.env_map.clone();
153
154                let _ = this_resp_tx.send(ExecutorEvent::SetupScriptFinished {
155                    stress_index,
156                    script_id,
157                    config,
158                    program,
159                    index,
160                    total,
161                    status,
162                });
163
164                env_map.map(|env_map| (script, env_map))
165            };
166
167            // Run this setup script to completion.
168            if let Some((script, env_map)) = script_fut.await {
169                setup_script_data.add_script(script, env_map);
170            }
171        }
172
173        setup_script_data
174    }
175
176    /// Returns a future that runs all attempts of a single test instance.
177    pub(super) async fn run_test_instance(
178        &self,
179        stress_index: Option<StressIndex>,
180        test: TestInstanceWithSettings<'a>,
181        cx: FutureQueueContext,
182        resp_tx: UnboundedSender<ExecutorEvent<'a>>,
183        setup_script_data: Arc<SetupScriptExecuteData<'a>>,
184    ) {
185        debug!(test_name = test.instance.name, "running test");
186
187        let settings = Arc::new(test.settings);
188
189        let retry_policy = self.force_retries.unwrap_or_else(|| settings.retries());
190        let total_attempts = retry_policy.count() + 1;
191        let mut backoff_iter = BackoffIter::new(retry_policy);
192
193        if let FilterMatch::Mismatch { reason } = test.instance.test_info.filter_match {
194            debug_assert!(
195                false,
196                "this test should already have been skipped in a filter step"
197            );
198            // Failure to send means the receiver was dropped.
199            let _ = resp_tx.send(ExecutorEvent::Skipped {
200                stress_index,
201                test_instance: test.instance,
202                reason,
203            });
204            return;
205        }
206
207        let (req_rx_tx, req_rx_rx) = oneshot::channel();
208
209        // Wait for the Started event to be processed by the
210        // execution future.
211        _ = resp_tx.send(ExecutorEvent::Started {
212            stress_index,
213            test_instance: test.instance,
214            req_rx_tx,
215        });
216        let mut req_rx = match req_rx_rx.await {
217            Ok(rx) => rx,
218            Err(_) => {
219                // The receiver was dropped -- the dispatcher has signaled that this unit should
220                // exit.
221                return;
222            }
223        };
224
225        let mut attempt = 0;
226        let mut delay = Duration::ZERO;
227        let last_run_status = loop {
228            attempt += 1;
229            let retry_data = RetryData {
230                attempt,
231                total_attempts,
232            };
233
234            if retry_data.attempt > 1 {
235                // Ensure that the dispatcher believes the run is still ongoing.
236                // If the run is cancelled, the dispatcher will let us know by
237                // dropping the receiver.
238                let (tx, rx) = oneshot::channel();
239                _ = resp_tx.send(ExecutorEvent::RetryStarted {
240                    stress_index,
241                    test_instance: test.instance,
242                    retry_data,
243                    tx,
244                });
245
246                match rx.await {
247                    Ok(()) => {}
248                    Err(_) => {
249                        // The receiver was dropped -- the dispatcher has
250                        // signaled that this unit should exit.
251                        return;
252                    }
253                }
254            }
255
256            // Some of this information is only useful for event reporting, but
257            // it's a lot easier to pass it in than to try and hook on
258            // additional information later.
259            let packet = TestPacket {
260                stress_index,
261                test_instance: test.instance,
262                cx: cx.clone(),
263                retry_data,
264                settings: settings.clone(),
265                setup_script_data: setup_script_data.clone(),
266                delay_before_start: delay,
267            };
268
269            let run_status = self.run_test(packet.clone(), &resp_tx, &mut req_rx).await;
270
271            if run_status.result.is_success() {
272                // The test succeeded.
273                break run_status;
274            } else if retry_data.attempt < retry_data.total_attempts {
275                // Retry this test: send a retry event, then retry the loop.
276                delay = backoff_iter
277                    .next()
278                    .expect("backoff delay must be non-empty");
279
280                let run_status = run_status.into_external();
281                let previous_result = run_status.result;
282                let previous_slow = run_status.is_slow;
283
284                let _ = resp_tx.send(ExecutorEvent::AttemptFailedWillRetry {
285                    stress_index,
286                    test_instance: test.instance,
287                    failure_output: settings.failure_output(),
288                    run_status,
289                    delay_before_next_attempt: delay,
290                });
291
292                handle_delay_between_attempts(
293                    &packet,
294                    previous_result,
295                    previous_slow,
296                    delay,
297                    &mut req_rx,
298                )
299                .await;
300            } else {
301                // This test failed and is out of retries.
302                break run_status;
303            }
304        };
305
306        drain_req_rx(req_rx, UnitExecuteStatus::Test(&last_run_status));
307
308        // At this point, either:
309        // * the test has succeeded, or
310        // * the test has failed and we've run out of retries.
311        // In either case, the test is finished.
312        let last_run_status = last_run_status.into_external();
313        let _ = resp_tx.send(ExecutorEvent::Finished {
314            stress_index,
315            test_instance: test.instance,
316            success_output: settings.success_output(),
317            failure_output: settings.failure_output(),
318            junit_store_success_output: settings.junit_store_success_output(),
319            junit_store_failure_output: settings.junit_store_failure_output(),
320            last_run_status,
321        });
322    }
323
324    // ---
325    // Helper methods
326    // ---
327
328    /// Run an individual setup script in its own process.
329    #[instrument(level = "debug", skip(self, resp_tx, req_rx))]
330    async fn run_setup_script(
331        &self,
332        script: SetupScriptPacket<'a>,
333        resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
334        req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
335    ) -> InternalSetupScriptExecuteStatus<'a> {
336        let mut stopwatch = crate::time::stopwatch();
337
338        match self
339            .run_setup_script_inner(script.clone(), &mut stopwatch, resp_tx, req_rx)
340            .await
341        {
342            Ok(status) => status,
343            Err(error) => InternalSetupScriptExecuteStatus {
344                script,
345                slow_after: None,
346                output: ChildExecutionOutput::StartError(error),
347                result: ExecutionResult::ExecFail,
348                stopwatch_end: stopwatch.snapshot(),
349                env_map: None,
350            },
351        }
352    }
353
354    #[instrument(level = "debug", skip(self, resp_tx, req_rx))]
355    async fn run_setup_script_inner(
356        &self,
357        script: SetupScriptPacket<'a>,
358        stopwatch: &mut StopwatchStart,
359        resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
360        req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
361    ) -> Result<InternalSetupScriptExecuteStatus<'a>, ChildStartError> {
362        let mut cmd =
363            script.make_command(self.profile.name(), &self.double_spawn, self.test_list)?;
364        let command_mut = cmd.command_mut();
365
366        command_mut.env("NEXTEST_RUN_ID", format!("{}", self.run_id));
367        command_mut.stdin(Stdio::null());
368        super::os::set_process_group(command_mut);
369
370        // If creating a job fails, we might be on an old system. Ignore this -- job objects are a
371        // best-effort thing.
372        let job = super::os::Job::create().ok();
373
374        // The --no-capture CLI argument overrides the config.
375        if self.capture_strategy != CaptureStrategy::None {
376            if script.config.capture_stdout {
377                command_mut.stdout(std::process::Stdio::piped());
378            }
379            if script.config.capture_stderr {
380                command_mut.stderr(std::process::Stdio::piped());
381            }
382        }
383
384        let (mut child, env_path) = cmd
385            .spawn()
386            .map_err(|error| ChildStartError::Spawn(Arc::new(error)))?;
387        let child_pid = child
388            .id()
389            .expect("child has never been polled so must return a PID");
390
391        // If assigning the child to the job fails, ignore this. This can happen if the process has
392        // exited.
393        let _ = super::os::assign_process_to_job(&child, job.as_ref());
394
395        let mut status: Option<ExecutionResult> = None;
396        // Unlike with tests, we don't automatically assume setup scripts are slow if they take a
397        // long time. For example, consider a setup script that performs a cargo build -- it can
398        // take an indeterminate amount of time. That's why we set a very large slow timeout rather
399        // than the test default of 60 seconds.
400        let slow_timeout = script
401            .config
402            .slow_timeout
403            .unwrap_or(SlowTimeout::VERY_LARGE);
404        let leak_timeout = script.config.leak_timeout.unwrap_or_default();
405
406        let mut interval_sleep = std::pin::pin!(crate::time::pausable_sleep(slow_timeout.period));
407
408        let mut timeout_hit = 0;
409
410        let child_fds = ChildFds::new_split(child.stdout.take(), child.stderr.take());
411        let mut child_acc = ChildAccumulator::new(child_fds);
412
413        let mut cx = UnitContext {
414            packet: UnitPacket::SetupScript(script.clone()),
415            slow_after: None,
416        };
417
418        let (res, leaked) = {
419            let res = loop {
420                tokio::select! {
421                    () = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
422                    res = child.wait() => {
423                        // The setup script finished executing.
424                        break res;
425                    }
426                    _ = &mut interval_sleep, if status.is_none() => {
427                        // Mark the script as slow.
428                        cx.slow_after = Some(slow_timeout.period);
429
430                        timeout_hit += 1;
431                        let will_terminate = if let Some(terminate_after) = slow_timeout.terminate_after {
432                            NonZeroUsize::new(timeout_hit as usize)
433                                .expect("timeout_hit was just incremented")
434                                >= terminate_after
435                        } else {
436                            false
437                        };
438
439                        if !slow_timeout.grace_period.is_zero() {
440                            let _ = resp_tx.send(script.slow_event(
441                                // Pass in the slow timeout period times timeout_hit, since
442                                // stopwatch.elapsed() tends to be slightly longer.
443                                timeout_hit * slow_timeout.period,
444                                will_terminate.then_some(slow_timeout.grace_period),
445                            ));
446                        }
447
448                        if will_terminate {
449                            // Attempt to terminate the slow script. As there is
450                            // a race between shutting down a slow test and its
451                            // own completion, we silently ignore errors to
452                            // avoid printing false warnings.
453                            //
454                            // The return result of terminate_child is not used
455                            // here, since it is always marked as a timeout.
456                            _ = super::os::terminate_child(
457                                &cx,
458                                &mut child,
459                                &mut child_acc,
460                                InternalTerminateReason::Timeout,
461                                stopwatch,
462                                req_rx,
463                                job.as_ref(),
464                                slow_timeout.grace_period,
465                            ).await;
466                            status = Some(ExecutionResult::Timeout);
467                            if slow_timeout.grace_period.is_zero() {
468                                break child.wait().await;
469                            }
470                            // Don't break here to give the wait task a chance to finish.
471                        } else {
472                            interval_sleep.as_mut().reset_last_duration();
473                        }
474                    }
475                    recv = req_rx.recv() => {
476                        // The sender stays open longer than the whole loop, and the buffer is big
477                        // enough for all messages ever sent through this channel, so a RecvError
478                        // should never happen.
479                        let req = recv.expect("a RecvError should never happen here");
480
481                        match req {
482                            RunUnitRequest::Signal(req) => {
483                                #[cfg_attr(not(windows), expect(unused_variables))]
484                                let res = handle_signal_request(
485                                    &cx,
486                                    &mut child,
487                                    &mut child_acc,
488                                    req,
489                                    stopwatch,
490                                    interval_sleep.as_mut(),
491                                    req_rx,
492                                    job.as_ref(),
493                                    slow_timeout.grace_period
494                                ).await;
495
496                                // On Unix, the signal the process exited with
497                                // will be picked up by child.wait. On Windows,
498                                // termination by job object will show up as
499                                // exit code 1 -- we need to be clearer about
500                                // that in the UI.
501                                //
502                                // TODO: Can we do something useful with res on
503                                // Unix? For example, it's possible that the
504                                // signal we send is not the same as the signal
505                                // the child exits with. This might be a good
506                                // thing to store in whatever test event log we
507                                // end up building.
508                                #[cfg(windows)]
509                                {
510                                    if matches!(
511                                        res,
512                                        HandleSignalResult::Terminated(super::TerminateChildResult::Killed)
513                                    ) {
514                                        status = Some(ExecutionResult::Fail {
515                                            failure_status: FailureStatus::Abort(
516                                                crate::reporter::events::AbortStatus::JobObject,
517                                            ),
518                                            leaked: false,
519                                        });
520                                    }
521                                }
522                            }
523                            RunUnitRequest::OtherCancel => {
524                                // Ignore non-signal cancellation requests --
525                                // let the script finish.
526                            }
527                            RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => {
528                                _ = sender.send(script.info_response(
529                                    UnitState::Running {
530                                        pid: child_pid,
531                                        time_taken:             stopwatch.snapshot().active,
532                                        slow_after: cx.slow_after,
533                                    },
534                                    child_acc.snapshot_in_progress(UnitKind::WAITING_ON_SCRIPT_MESSAGE),
535                                ));
536                            }
537                        }
538                    }
539                }
540            };
541
542            // Build a tentative status using status and the exit status.
543            let tentative_status = status.or_else(|| {
544                res.as_ref().ok().map(|res| {
545                    create_execution_result(*res, &child_acc.errors, false, LeakTimeoutResult::Pass)
546                })
547            });
548
549            let leaked = detect_fd_leaks(
550                &cx,
551                child_pid,
552                &mut child_acc,
553                tentative_status,
554                leak_timeout,
555                stopwatch,
556                req_rx,
557            )
558            .await;
559
560            (res, leaked)
561        };
562
563        let exit_status = match res {
564            Ok(exit_status) => Some(exit_status),
565            Err(err) => {
566                child_acc.errors.push(ChildFdError::Wait(Arc::new(err)));
567                None
568            }
569        };
570
571        let exit_status = exit_status.expect("None always results in early return");
572
573        let exec_result = status.unwrap_or_else(|| {
574            create_execution_result(exit_status, &child_acc.errors, leaked, leak_timeout.result)
575        });
576
577        // Read from the environment map. If there's an error here, add it to the list of child errors.
578        let mut errors: Vec<_> = child_acc.errors.into_iter().map(ChildError::from).collect();
579        let env_map = if exec_result.is_success() {
580            match parse_env_file(&env_path).await {
581                Ok(env_map) => Some(env_map),
582                Err(error) => {
583                    errors.push(ChildError::SetupScriptOutput(error));
584                    None
585                }
586            }
587        } else {
588            None
589        };
590
591        Ok(InternalSetupScriptExecuteStatus {
592            script,
593            slow_after: cx.slow_after,
594            output: ChildExecutionOutput::Output {
595                result: Some(exec_result),
596                output: child_acc.output.freeze(),
597                errors: ErrorList::new(UnitKind::WAITING_ON_SCRIPT_MESSAGE, errors),
598            },
599            result: exec_result,
600            stopwatch_end: stopwatch.snapshot(),
601            env_map,
602        })
603    }
604
605    /// Run an individual test in its own process.
606    #[instrument(level = "debug", skip(self, resp_tx, req_rx))]
607    async fn run_test(
608        &self,
609        test: TestPacket<'a>,
610        resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
611        req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
612    ) -> InternalExecuteStatus<'a> {
613        let mut stopwatch = crate::time::stopwatch();
614
615        match self
616            .run_test_inner(test.clone(), &mut stopwatch, resp_tx, req_rx)
617            .await
618        {
619            Ok(run_status) => run_status,
620            Err(error) => InternalExecuteStatus {
621                test,
622                slow_after: None,
623                output: ChildExecutionOutput::StartError(error),
624                result: ExecutionResult::ExecFail,
625                stopwatch_end: stopwatch.snapshot(),
626            },
627        }
628    }
629
630    async fn run_test_inner(
631        &self,
632        test: TestPacket<'a>,
633        stopwatch: &mut StopwatchStart,
634        resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
635        req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
636    ) -> Result<InternalExecuteStatus<'a>, ChildStartError> {
637        let ctx = TestExecuteContext {
638            profile_name: self.profile.name(),
639            double_spawn: &self.double_spawn,
640            target_runner: &self.target_runner,
641        };
642        let mut cmd = test.test_instance.make_command(
643            &ctx,
644            self.test_list,
645            test.settings.run_wrapper(),
646            test.settings.run_extra_args(),
647        );
648        let command_mut = cmd.command_mut();
649
650        // Debug environment variable for testing.
651        command_mut.env("__NEXTEST_ATTEMPT", format!("{}", test.retry_data.attempt));
652
653        command_mut.env("NEXTEST_RUN_ID", format!("{}", self.run_id));
654
655        // Set group and slot environment variables.
656        command_mut.env(
657            "NEXTEST_TEST_GLOBAL_SLOT",
658            test.cx.global_slot().to_string(),
659        );
660        match test.settings.test_group() {
661            TestGroup::Custom(name) => {
662                debug_assert!(
663                    test.cx.group_slot().is_some(),
664                    "test_group being set implies group_slot is set"
665                );
666                command_mut.env("NEXTEST_TEST_GROUP", name.as_str());
667            }
668            TestGroup::Global => {
669                debug_assert!(
670                    test.cx.group_slot().is_none(),
671                    "test_group being unset implies group_slot is unset"
672                );
673                command_mut.env("NEXTEST_TEST_GROUP", TestGroup::GLOBAL_STR);
674            }
675        }
676        if let Some(group_slot) = test.cx.group_slot() {
677            command_mut.env("NEXTEST_TEST_GROUP_SLOT", group_slot.to_string());
678        } else {
679            command_mut.env("NEXTEST_TEST_GROUP_SLOT", "none");
680        }
681
682        command_mut.stdin(Stdio::null());
683        test.setup_script_data.apply(
684            &test.test_instance.to_test_query(),
685            &self.profile.filterset_ecx(),
686            command_mut,
687        );
688        super::os::set_process_group(command_mut);
689
690        // If creating a job fails, we might be on an old system. Ignore this -- job objects are a
691        // best-effort thing.
692        let job = super::os::Job::create().ok();
693
694        let crate::test_command::Child {
695            mut child,
696            child_fds,
697        } = cmd
698            .spawn(self.capture_strategy)
699            .map_err(|error| ChildStartError::Spawn(Arc::new(error)))?;
700
701        // Note: The PID stored here must be used with care -- it might be
702        // outdated and have been reused by the kernel in case the process
703        // has exited. If using for any real logic (not just reporting) it
704        // might be best to always check child.id().
705        let child_pid = child
706            .id()
707            .expect("child has never been polled so must return a PID");
708
709        // If assigning the child to the job fails, ignore this. This can happen if the process has
710        // exited.
711        let _ = super::os::assign_process_to_job(&child, job.as_ref());
712
713        let mut child_acc = ChildAccumulator::new(child_fds);
714
715        let mut status: Option<ExecutionResult> = None;
716        let slow_timeout = test.settings.slow_timeout();
717        let leak_timeout = test.settings.leak_timeout();
718
719        // Use a pausable_sleep rather than an interval here because it's much
720        // harder to pause and resume an interval.
721        let mut interval_sleep = std::pin::pin!(crate::time::pausable_sleep(slow_timeout.period));
722
723        let mut timeout_hit = 0;
724
725        let mut cx = UnitContext {
726            packet: UnitPacket::Test(test.clone()),
727            slow_after: None,
728        };
729
730        let (res, leaked) = {
731            let res = loop {
732                tokio::select! {
733                    () = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
734                    res = child.wait() => {
735                        // The test finished executing.
736                        break res;
737                    }
738                    _ = &mut interval_sleep, if status.is_none() => {
739                        // Mark the test as slow.
740                        cx.slow_after = Some(slow_timeout.period);
741
742                        timeout_hit += 1;
743                        let will_terminate = if let Some(terminate_after) = slow_timeout.terminate_after {
744                            NonZeroUsize::new(timeout_hit as usize)
745                                .expect("timeout_hit was just incremented")
746                                >= terminate_after
747                        } else {
748                            false
749                        };
750
751                        if !slow_timeout.grace_period.is_zero() {
752                            let _ = resp_tx.send(test.slow_event(
753                                // Pass in the slow timeout period times timeout_hit, since
754                                // stopwatch.elapsed() tends to be slightly longer.
755                                timeout_hit * slow_timeout.period,
756                                will_terminate.then_some(slow_timeout.grace_period),
757                            ));
758                        }
759
760                        if will_terminate {
761                            // Attempt to terminate the slow test. As there is a
762                            // race between shutting down a slow test and its
763                            // own completion, we silently ignore errors to
764                            // avoid printing false warnings.
765                            //
766                            // The return result of terminate_child is not used
767                            // here, since it is always marked as a timeout.
768                            _ = super::os::terminate_child(
769                                &cx,
770                                &mut child,
771                                &mut child_acc,
772                                InternalTerminateReason::Timeout,
773                                stopwatch,
774                                req_rx,
775                                job.as_ref(),
776                                slow_timeout.grace_period,
777                            ).await;
778                            status = Some(ExecutionResult::Timeout);
779                            if slow_timeout.grace_period.is_zero() {
780                                break child.wait().await;
781                            }
782                            // Don't break here to give the wait task a chance to finish.
783                        } else {
784                            interval_sleep.as_mut().reset_last_duration();
785                        }
786                    }
787                    recv = req_rx.recv() => {
788                        // The sender stays open longer than the whole loop so a
789                        // RecvError should never happen.
790                        let req = recv.expect("req_rx sender is open");
791
792                        match req {
793                            RunUnitRequest::Signal(req) => {
794                                #[cfg_attr(not(windows), expect(unused_variables))]
795                                let res = handle_signal_request(
796                                    &cx,
797                                    &mut child,
798                                    &mut child_acc,
799                                    req,
800                                    stopwatch,
801                                    interval_sleep.as_mut(),
802                                    req_rx,
803                                    job.as_ref(),
804                                    slow_timeout.grace_period,
805                                ).await;
806
807                                // On Unix, the signal the process exited with
808                                // will be picked up by child.wait. On Windows,
809                                // termination by job object will show up as
810                                // exit code 1 -- we need to be clearer about
811                                // that in the UI.
812                                //
813                                // TODO: Can we do something useful with res on
814                                // Unix? For example, it's possible that the
815                                // signal we send is not the same as the signal
816                                // the child exits with. This might be a good
817                                // thing to store in whatever test event log we
818                                // end up building.
819                                #[cfg(windows)]
820                                {
821                                    if matches!(
822                                        res,
823                                        HandleSignalResult::Terminated(super::TerminateChildResult::Killed)
824                                    ) {
825                                        status = Some(ExecutionResult::Fail {
826                                            failure_status: FailureStatus::Abort(
827                                                crate::reporter::events::AbortStatus::JobObject,
828                                            ),
829                                            leaked: false,
830                                        });
831                                    }
832                                }
833                            }
834                            RunUnitRequest::OtherCancel => {
835                                // Ignore non-signal cancellation requests --
836                                // let the test finish.
837                            }
838                            RunUnitRequest::Query(RunUnitQuery::GetInfo(tx)) => {
839                                _ = tx.send(test.info_response(
840                                    UnitState::Running {
841                                        pid: child_pid,
842                                        time_taken: stopwatch.snapshot().active,
843                                        slow_after: cx.slow_after,
844                                    },
845                                    child_acc.snapshot_in_progress(UnitKind::WAITING_ON_TEST_MESSAGE),
846                                ));
847                            }
848                        }
849                    }
850                };
851            };
852
853            // Build a tentative status using status and the exit status.
854            let tentative_status = status.or_else(|| {
855                res.as_ref().ok().map(|res| {
856                    create_execution_result(*res, &child_acc.errors, false, LeakTimeoutResult::Pass)
857                })
858            });
859
860            let leaked = detect_fd_leaks(
861                &cx,
862                child_pid,
863                &mut child_acc,
864                tentative_status,
865                leak_timeout,
866                stopwatch,
867                req_rx,
868            )
869            .await;
870
871            (res, leaked)
872        };
873
874        let exit_status = match res {
875            Ok(exit_status) => Some(exit_status),
876            Err(err) => {
877                child_acc.errors.push(ChildFdError::Wait(Arc::new(err)));
878                None
879            }
880        };
881
882        let exit_status = exit_status.expect("None always results in early return");
883        let exec_result = status.unwrap_or_else(|| {
884            create_execution_result(exit_status, &child_acc.errors, leaked, leak_timeout.result)
885        });
886
887        Ok(InternalExecuteStatus {
888            test,
889            slow_after: cx.slow_after,
890            output: ChildExecutionOutput::Output {
891                result: Some(exec_result),
892                output: child_acc.output.freeze(),
893                errors: ErrorList::new(UnitKind::WAITING_ON_TEST_MESSAGE, child_acc.errors),
894            },
895            result: exec_result,
896            stopwatch_end: stopwatch.snapshot(),
897        })
898    }
899}
900
901#[derive(Debug)]
902struct BackoffIter {
903    policy: RetryPolicy,
904    current_factor: f64,
905    remaining_attempts: usize,
906}
907
908impl BackoffIter {
909    const BACKOFF_EXPONENT: f64 = 2.;
910
911    fn new(policy: RetryPolicy) -> Self {
912        let remaining_attempts = policy.count();
913        Self {
914            policy,
915            current_factor: 1.,
916            remaining_attempts,
917        }
918    }
919
920    fn next_delay_and_jitter(&mut self) -> (Duration, bool) {
921        match self.policy {
922            RetryPolicy::Fixed { delay, jitter, .. } => (delay, jitter),
923            RetryPolicy::Exponential {
924                delay,
925                jitter,
926                max_delay,
927                ..
928            } => {
929                let factor = self.current_factor;
930                let exp_delay = delay.mul_f64(factor);
931
932                // Stop multiplying the exponential factor if delay is greater than max_delay.
933                if let Some(max_delay) = max_delay {
934                    if exp_delay > max_delay {
935                        return (max_delay, jitter);
936                    }
937                }
938
939                let next_factor = self.current_factor * Self::BACKOFF_EXPONENT;
940                self.current_factor = next_factor;
941
942                (exp_delay, jitter)
943            }
944        }
945    }
946
947    fn apply_jitter(duration: Duration) -> Duration {
948        let jitter: f64 = rand::rng().sample(OpenClosed01);
949        // Apply jitter in the range (0.5, 1].
950        duration.mul_f64(0.5 + jitter / 2.)
951    }
952}
953
954impl Iterator for BackoffIter {
955    type Item = Duration;
956    fn next(&mut self) -> Option<Self::Item> {
957        if self.remaining_attempts > 0 {
958            let (mut delay, jitter) = self.next_delay_and_jitter();
959            if jitter {
960                delay = Self::apply_jitter(delay);
961            }
962            self.remaining_attempts -= 1;
963            Some(delay)
964        } else {
965            None
966        }
967    }
968}
969
970/// Either a test or a setup script, along with information about how long the
971/// test took.
972pub(super) struct UnitContext<'a> {
973    packet: UnitPacket<'a>,
974    // TODO: This is a bit of a mess. It isn't clear where this kind of state
975    // should live -- many parts of the request-response system need various
976    // pieces of this code.
977    slow_after: Option<Duration>,
978}
979
980impl<'a> UnitContext<'a> {
981    pub(super) fn packet(&self) -> &UnitPacket<'a> {
982        &self.packet
983    }
984
985    pub(super) fn info_response(
986        &self,
987        state: UnitState,
988        output: ChildExecutionOutput,
989    ) -> InfoResponse<'a> {
990        match &self.packet {
991            UnitPacket::SetupScript(packet) => packet.info_response(state, output),
992            UnitPacket::Test(packet) => packet.info_response(state, output),
993        }
994    }
995}
996
997#[derive(Clone, Debug)]
998pub(super) enum UnitPacket<'a> {
999    SetupScript(SetupScriptPacket<'a>),
1000    Test(TestPacket<'a>),
1001}
1002
1003impl UnitPacket<'_> {
1004    pub(super) fn kind(&self) -> UnitKind {
1005        match self {
1006            Self::SetupScript(_) => UnitKind::Script,
1007            Self::Test(_) => UnitKind::Test,
1008        }
1009    }
1010}
1011
1012#[derive(Clone)]
1013pub(super) struct TestPacket<'a> {
1014    stress_index: Option<StressIndex>,
1015    test_instance: TestInstance<'a>,
1016    cx: FutureQueueContext,
1017    retry_data: RetryData,
1018    settings: Arc<TestSettings<'a>>,
1019    setup_script_data: Arc<SetupScriptExecuteData<'a>>,
1020    delay_before_start: Duration,
1021}
1022
1023impl<'a> TestPacket<'a> {
1024    fn slow_event(&self, elapsed: Duration, will_terminate: Option<Duration>) -> ExecutorEvent<'a> {
1025        ExecutorEvent::Slow {
1026            stress_index: self.stress_index,
1027            test_instance: self.test_instance,
1028            retry_data: self.retry_data,
1029            elapsed,
1030            will_terminate,
1031        }
1032    }
1033
1034    pub(super) fn retry_data(&self) -> RetryData {
1035        self.retry_data
1036    }
1037
1038    pub(super) fn delay_before_start(&self) -> Duration {
1039        self.delay_before_start
1040    }
1041
1042    pub(super) fn info_response(
1043        &self,
1044        state: UnitState,
1045        output: ChildExecutionOutput,
1046    ) -> InfoResponse<'a> {
1047        InfoResponse::Test(TestInfoResponse {
1048            stress_index: self.stress_index,
1049            test_instance: self.test_instance.id(),
1050            state,
1051            retry_data: self.retry_data,
1052            output,
1053        })
1054    }
1055}
1056
1057impl fmt::Debug for TestPacket<'_> {
1058    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1059        f.debug_struct("TestPacket")
1060            .field("test_instance", &self.test_instance.id())
1061            .field("cx", &self.cx)
1062            .finish_non_exhaustive()
1063    }
1064}
1065
1066#[derive(Clone, Debug)]
1067pub(super) struct SetupScriptPacket<'a> {
1068    stress_index: Option<StressIndex>,
1069    script_id: ScriptId,
1070    config: &'a SetupScriptConfig,
1071    program: String,
1072}
1073
1074impl<'a> SetupScriptPacket<'a> {
1075    /// Turns self into a command that can be executed.
1076    fn make_command(
1077        &self,
1078        profile_name: &str,
1079        double_spawn: &DoubleSpawnInfo,
1080        test_list: &TestList<'_>,
1081    ) -> Result<SetupScriptCommand, ChildStartError> {
1082        SetupScriptCommand::new(self.config, profile_name, double_spawn, test_list)
1083    }
1084
1085    fn slow_event(&self, elapsed: Duration, will_terminate: Option<Duration>) -> ExecutorEvent<'a> {
1086        ExecutorEvent::SetupScriptSlow {
1087            stress_index: self.stress_index,
1088            script_id: self.script_id.clone(),
1089            config: self.config,
1090            program: self.program.clone(),
1091            elapsed,
1092            will_terminate,
1093        }
1094    }
1095
1096    pub(super) fn info_response(
1097        &self,
1098        state: UnitState,
1099        output: ChildExecutionOutput,
1100    ) -> InfoResponse<'a> {
1101        InfoResponse::SetupScript(SetupScriptInfoResponse {
1102            stress_index: self.stress_index,
1103            script_id: self.script_id.clone(),
1104            program: self.program.clone(),
1105            args: &self.config.command.args,
1106            state,
1107            output,
1108        })
1109    }
1110}
1111
1112/// Drains the request receiver of any messages.
1113fn drain_req_rx<'a>(
1114    mut receiver: UnboundedReceiver<RunUnitRequest<'a>>,
1115    status: UnitExecuteStatus<'a, '_>,
1116) {
1117    // Mark the receiver closed so no further messages are sent.
1118    receiver.close();
1119    loop {
1120        // Receive anything that's left in the receiver.
1121        let message = receiver.try_recv();
1122        match message {
1123            Ok(message) => {
1124                message.drain(status);
1125            }
1126            Err(_) => {
1127                break;
1128            }
1129        }
1130    }
1131}
1132
1133async fn handle_delay_between_attempts<'a>(
1134    packet: &TestPacket<'a>,
1135    previous_result: ExecutionResult,
1136    previous_slow: bool,
1137    delay: Duration,
1138    req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
1139) {
1140    let mut sleep = std::pin::pin!(crate::time::pausable_sleep(delay));
1141    #[cfg_attr(not(unix), expect(unused_mut))]
1142    let mut waiting_stopwatch = crate::time::stopwatch();
1143
1144    loop {
1145        tokio::select! {
1146            _ = &mut sleep => {
1147                // The timer has expired.
1148                break;
1149            }
1150            recv = req_rx.recv() => {
1151                let req = recv.expect("req_rx sender is open");
1152
1153                match req {
1154                    #[cfg(unix)]
1155                    RunUnitRequest::Signal(SignalRequest::Stop(tx)) => {
1156                        sleep.as_mut().pause();
1157                        waiting_stopwatch.pause();
1158                        _ = tx.send(());
1159                    }
1160                    #[cfg(unix)]
1161                    RunUnitRequest::Signal(SignalRequest::Continue) => {
1162                        if sleep.is_paused() {
1163                            sleep.as_mut().resume();
1164                            waiting_stopwatch.resume();
1165                        }
1166                    }
1167                    RunUnitRequest::Signal(SignalRequest::Shutdown(_)) => {
1168                        // The run was cancelled, so go ahead and perform a
1169                        // shutdown.
1170                        break;
1171                    }
1172                    RunUnitRequest::OtherCancel => {
1173                        // If a cancellation was requested, break out of the
1174                        // loop.
1175                        break;
1176                    }
1177                    RunUnitRequest::Query(RunUnitQuery::GetInfo(tx)) => {
1178                        let waiting_snapshot = waiting_stopwatch.snapshot();
1179                        _ = tx.send(
1180                            packet.info_response(
1181                                UnitState::DelayBeforeNextAttempt {
1182                                    previous_result,
1183                                    previous_slow,
1184                                    waiting_duration: waiting_snapshot.active,
1185                                    remaining: delay
1186                                        .checked_sub(waiting_snapshot.active)
1187                                        .unwrap_or_default(),
1188                                },
1189                                // This field is ignored but our data model
1190                                // requires it.
1191                                ChildExecutionOutput::Output {
1192                                    result: None,
1193                                    output: ChildOutput::Split(ChildSplitOutput {
1194                                        stdout: None,
1195                                        stderr: None,
1196                                    }),
1197                                    errors: None,
1198                                },
1199                            ),
1200                        );
1201                    }
1202                }
1203            }
1204        }
1205    }
1206}
1207
1208/// After a child process has exited, detect if it leaked file handles by
1209/// leaving long-running grandchildren open.
1210///
1211/// This is done by waiting for a short period of time after the child has
1212/// exited, and checking if stdout and stderr are still open. In the future, we
1213/// could do more sophisticated checks around e.g. if any processes with the
1214/// same PGID are around.
1215async fn detect_fd_leaks<'a>(
1216    cx: &UnitContext<'a>,
1217    child_pid: u32,
1218    child_acc: &mut ChildAccumulator,
1219    tentative_result: Option<ExecutionResult>,
1220    leak_timeout: LeakTimeout,
1221    stopwatch: &mut StopwatchStart,
1222    req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
1223) -> bool {
1224    loop {
1225        // Ignore stop and continue events here since the leak timeout should be very small.
1226        // TODO: we may want to consider them.
1227        let mut sleep = std::pin::pin!(tokio::time::sleep(leak_timeout.period));
1228        let waiting_stopwatch = crate::time::stopwatch();
1229
1230        tokio::select! {
1231            // All of the branches here need to check for
1232            // `!child_acc.fds.is_done()`, because if child_fds is done we want
1233            // to hit the `else` block right away.
1234            () = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
1235            () = &mut sleep, if !child_acc.fds.is_done() => {
1236                break true;
1237            }
1238            recv = req_rx.recv(), if !child_acc.fds.is_done() => {
1239                // The sender stays open longer than the whole loop, and the
1240                // buffer is big enough for all messages ever sent through this
1241                // channel, so a RecvError should never happen.
1242                let req = recv.expect("a RecvError should never happen here");
1243
1244                match req {
1245                    RunUnitRequest::Signal(_) => {
1246                        // The process is done executing, so signals are moot.
1247                    }
1248                    RunUnitRequest::OtherCancel => {
1249                        // Ignore non-signal cancellation requests -- let the
1250                        // unit finish.
1251                    }
1252                    RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => {
1253                        let snapshot = waiting_stopwatch.snapshot();
1254                        let resp = cx.info_response(
1255                            UnitState::Exiting {
1256                                // Because we've polled that the child is done,
1257                                // child.id() will likely return None at this
1258                                // point. Use the cached PID since this is just
1259                                // for reporting.
1260                                pid: child_pid,
1261                                time_taken: stopwatch.snapshot().active,
1262                                slow_after: cx.slow_after,
1263                                tentative_result,
1264                                waiting_duration: snapshot.active,
1265                                remaining: leak_timeout.period
1266                                    .checked_sub(snapshot.active)
1267                                    .unwrap_or_default(),
1268                            },
1269                            child_acc.snapshot_in_progress(cx.packet.kind().waiting_on_message()),
1270                        );
1271
1272                        _ = sender.send(resp);
1273                    }
1274                }
1275            }
1276            else => {
1277                break false;
1278            }
1279        }
1280    }
1281}
1282
1283// It would be nice to fix this function to not have so many arguments, but this
1284// code is actively being refactored right now and imposing too much structure
1285// can cause more harm than good.
1286#[expect(clippy::too_many_arguments)]
1287async fn handle_signal_request<'a>(
1288    cx: &UnitContext<'a>,
1289    child: &mut Child,
1290    child_acc: &mut ChildAccumulator,
1291    req: SignalRequest,
1292    stopwatch: &mut StopwatchStart,
1293    #[cfg_attr(not(unix), expect(unused_mut, unused_variables))] mut interval_sleep: Pin<
1294        &mut PausableSleep,
1295    >,
1296    req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
1297    job: Option<&super::os::Job>,
1298    grace_period: Duration,
1299) -> HandleSignalResult {
1300    match req {
1301        #[cfg(unix)]
1302        SignalRequest::Stop(sender) => {
1303            // It isn't possible to receive a stop event twice since it gets
1304            // debounced in the main signal handler.
1305            stopwatch.pause();
1306            interval_sleep.as_mut().pause();
1307            super::os::job_control_child(child, crate::signal::JobControlEvent::Stop);
1308            // The receiver being dead probably means the main thread panicked
1309            // or similar.
1310            let _ = sender.send(());
1311            HandleSignalResult::JobControl
1312        }
1313        #[cfg(unix)]
1314        SignalRequest::Continue => {
1315            // It's possible to receive a resume event right at the beginning of
1316            // test execution, so debounce it.
1317            if stopwatch.is_paused() {
1318                stopwatch.resume();
1319                interval_sleep.as_mut().resume();
1320                super::os::job_control_child(child, crate::signal::JobControlEvent::Continue);
1321            }
1322            HandleSignalResult::JobControl
1323        }
1324        SignalRequest::Shutdown(event) => {
1325            let res = super::os::terminate_child(
1326                cx,
1327                child,
1328                child_acc,
1329                InternalTerminateReason::Signal(event),
1330                stopwatch,
1331                req_rx,
1332                job,
1333                grace_period,
1334            )
1335            .await;
1336            HandleSignalResult::Terminated(res)
1337        }
1338    }
1339}
1340
1341fn create_execution_result(
1342    exit_status: ExitStatus,
1343    child_errors: &[ChildFdError],
1344    leaked: bool,
1345    leak_timeout_result: LeakTimeoutResult,
1346) -> ExecutionResult {
1347    if !child_errors.is_empty() {
1348        // If an error occurred while waiting on the child handles, treat it as
1349        // an execution failure.
1350        ExecutionResult::ExecFail
1351    } else if exit_status.success() {
1352        if leaked {
1353            // Note: this is test passed (exited with code 0) + leaked handles,
1354            // not test failed and also leaked handles.
1355            ExecutionResult::Leak {
1356                result: leak_timeout_result,
1357            }
1358        } else {
1359            ExecutionResult::Pass
1360        }
1361    } else {
1362        ExecutionResult::Fail {
1363            failure_status: FailureStatus::extract(exit_status),
1364            leaked,
1365        }
1366    }
1367}