nextest_runner/runner/
imp.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6    config::{
7        core::EvaluatableProfile,
8        elements::{FlakyResult, MaxFail, RetryPolicy, TestGroup, TestThreads},
9        scripts::SetupScriptExecuteData,
10    },
11    double_spawn::DoubleSpawnInfo,
12    errors::{
13        ConfigureHandleInheritanceError, DebuggerCommandParseError, StressCountParseError,
14        TestRunnerBuildError, TestRunnerExecuteErrors, TracerCommandParseError,
15    },
16    input::{InputHandler, InputHandlerKind, InputHandlerStatus},
17    list::{OwnedTestInstanceId, TestInstanceWithSettings, TestList},
18    reporter::events::{ReporterEvent, RunStats, StressIndex},
19    runner::ExecutorEvent,
20    signal::{SignalHandler, SignalHandlerKind},
21    target_runner::TargetRunner,
22    test_output::CaptureStrategy,
23};
24use async_scoped::TokioScope;
25use chrono::{DateTime, Local};
26use future_queue::{FutureQueueContext, StreamExt};
27use futures::{future::Fuse, prelude::*};
28use nextest_metadata::FilterMatch;
29use quick_junit::ReportUuid;
30use semver::Version;
31use std::{
32    collections::BTreeSet, convert::Infallible, fmt, num::NonZero, pin::Pin, str::FromStr,
33    sync::Arc, time::Duration,
34};
35use tokio::{
36    runtime::Runtime,
37    sync::{mpsc::unbounded_channel, oneshot},
38    task::JoinError,
39};
40use tracing::{debug, warn};
41
42/// A parsed debugger command.
43#[derive(Clone, Debug)]
44pub struct DebuggerCommand {
45    program: String,
46    args: Vec<String>,
47}
48
49impl DebuggerCommand {
50    /// Gets the program.
51    pub fn program(&self) -> &str {
52        // The from_str constructor ensures that there is at least one part.
53        &self.program
54    }
55
56    /// Gets the arguments.
57    pub fn args(&self) -> &[String] {
58        &self.args
59    }
60}
61
62impl FromStr for DebuggerCommand {
63    type Err = DebuggerCommandParseError;
64
65    fn from_str(command: &str) -> Result<Self, Self::Err> {
66        let mut parts =
67            shell_words::split(command).map_err(DebuggerCommandParseError::ShellWordsParse)?;
68        if parts.is_empty() {
69            return Err(DebuggerCommandParseError::EmptyCommand);
70        }
71        let program = parts.remove(0);
72        Ok(Self {
73            program,
74            args: parts,
75        })
76    }
77}
78
79/// A parsed tracer command.
80#[derive(Clone, Debug)]
81pub struct TracerCommand {
82    program: String,
83    args: Vec<String>,
84}
85
86impl TracerCommand {
87    /// Gets the program.
88    pub fn program(&self) -> &str {
89        &self.program
90    }
91
92    /// Gets the arguments.
93    pub fn args(&self) -> &[String] {
94        &self.args
95    }
96}
97
98impl FromStr for TracerCommand {
99    type Err = TracerCommandParseError;
100
101    fn from_str(command: &str) -> Result<Self, Self::Err> {
102        let mut parts =
103            shell_words::split(command).map_err(TracerCommandParseError::ShellWordsParse)?;
104        if parts.is_empty() {
105            return Err(TracerCommandParseError::EmptyCommand);
106        }
107        let program = parts.remove(0);
108        Ok(Self {
109            program,
110            args: parts,
111        })
112    }
113}
114
115/// An interceptor wraps test execution with a debugger or tracer.
116#[derive(Clone, Debug, Default)]
117pub enum Interceptor {
118    /// No interceptor - standard test execution.
119    #[default]
120    None,
121
122    /// Run the test under a debugger.
123    Debugger(DebuggerCommand),
124
125    /// Run the test under a syscall tracer.
126    Tracer(TracerCommand),
127}
128
129impl Interceptor {
130    /// Returns true if timeouts should be disabled.
131    ///
132    /// Both debuggers and tracers disable timeouts.
133    pub fn should_disable_timeouts(&self) -> bool {
134        match self {
135            Interceptor::None => false,
136            Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
137        }
138    }
139
140    /// Returns true if stdin should be passed through to child test processes.
141    ///
142    /// Only debuggers need stdin passthrough for interactive debugging.
143    pub fn should_passthrough_stdin(&self) -> bool {
144        match self {
145            Interceptor::None | Interceptor::Tracer(_) => false,
146            Interceptor::Debugger(_) => true,
147        }
148    }
149
150    /// Returns true if a process group should be created for the child.
151    ///
152    /// Debuggers need terminal control, so no process group is created. Tracers
153    /// work fine with process groups.
154    pub fn should_create_process_group(&self) -> bool {
155        match self {
156            Interceptor::None | Interceptor::Tracer(_) => true,
157            Interceptor::Debugger(_) => false,
158        }
159    }
160
161    /// Returns true if leak detection should be skipped.
162    ///
163    /// Both debuggers and tracers skip leak detection to avoid interference.
164    pub fn should_skip_leak_detection(&self) -> bool {
165        match self {
166            Interceptor::None => false,
167            Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
168        }
169    }
170
171    /// Returns true if the test command should be displayed.
172    ///
173    /// Used to determine if we should print the wrapper command for debugging.
174    pub fn should_show_wrapper_command(&self) -> bool {
175        match self {
176            Interceptor::None => false,
177            Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
178        }
179    }
180
181    /// Returns true if, on receiving SIGTSTP, we should send SIGTSTP to the
182    /// child.
183    ///
184    /// Debugger mode has special signal handling where we don't send SIGTSTP to
185    /// the child (it receives it directly from the terminal, since no process
186    /// group is created). Tracers use standard signal handling.
187    pub fn should_send_sigtstp(&self) -> bool {
188        match self {
189            Interceptor::None | Interceptor::Tracer(_) => true,
190            Interceptor::Debugger(_) => false,
191        }
192    }
193}
194
195/// Version-related environment variables set for tests and setup scripts.
196///
197/// These expose the current nextest version and any version constraints from
198/// the repository's configuration.
199#[derive(Clone, Debug)]
200pub struct VersionEnvVars {
201    /// The current nextest version.
202    pub current_version: Version,
203
204    /// The required nextest version from configuration, if any.
205    pub required_version: Option<Version>,
206
207    /// The recommended nextest version from configuration, if any.
208    pub recommended_version: Option<Version>,
209}
210
211impl VersionEnvVars {
212    /// Applies the version environment variables to a command.
213    pub(super) fn apply_env(&self, cmd: &mut std::process::Command) {
214        cmd.env("NEXTEST_VERSION", self.current_version.to_string());
215        cmd.env(
216            "NEXTEST_REQUIRED_VERSION",
217            match &self.required_version {
218                Some(v) => v.to_string(),
219                None => "none".to_owned(),
220            },
221        );
222        cmd.env(
223            "NEXTEST_RECOMMENDED_VERSION",
224            match &self.recommended_version {
225                Some(v) => v.to_string(),
226                None => "none".to_owned(),
227            },
228        );
229    }
230}
231
232/// A child process identifier: either a single process or a process group.
233#[derive(Copy, Clone, Debug)]
234pub(super) enum ChildPid {
235    /// A single process ID.
236    Process(#[cfg_attr(not(unix), expect(unused))] u32),
237
238    /// A process group ID.
239    #[cfg(unix)]
240    ProcessGroup(u32),
241}
242
243impl ChildPid {
244    /// Returns the PID value to use with `libc::kill`.
245    ///
246    /// - `Process(pid)` returns `pid as i32` (positive, kills single process).
247    /// - `ProcessGroup(pid)` returns `-(pid as i32)` (negative, kills process group).
248    ///
249    /// On Windows, always returns `pid as i32`.
250    #[cfg(unix)]
251    pub(super) fn for_kill(self) -> i32 {
252        match self {
253            ChildPid::Process(pid) => pid as i32,
254            ChildPid::ProcessGroup(pid) => -(pid as i32),
255        }
256    }
257}
258
259/// Test runner options.
260#[derive(Debug, Default)]
261pub struct TestRunnerBuilder {
262    capture_strategy: CaptureStrategy,
263    retries: Option<RetryPolicy>,
264    flaky_result: Option<FlakyResult>,
265    max_fail: Option<MaxFail>,
266    test_threads: Option<TestThreads>,
267    stress_condition: Option<StressCondition>,
268    interceptor: Interceptor,
269    expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
270    version_env_vars: Option<VersionEnvVars>,
271}
272
273impl TestRunnerBuilder {
274    /// Sets the capture strategy for the test runner
275    ///
276    /// * [`CaptureStrategy::Split`]
277    ///   * pro: output from `stdout` and `stderr` can be identified and easily split
278    ///   * con: ordering between the streams cannot be guaranteed
279    /// * [`CaptureStrategy::Combined`]
280    ///   * pro: output is guaranteed to be ordered as it would in a terminal emulator
281    ///   * con: distinction between `stdout` and `stderr` is lost
282    /// * [`CaptureStrategy::None`] -
283    ///   * In this mode, tests will always be run serially: `test_threads` will always be 1.
284    pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
285        self.capture_strategy = strategy;
286        self
287    }
288
289    /// Sets the number of retries for this test runner.
290    pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
291        self.retries = Some(retries);
292        self
293    }
294
295    /// Sets the flaky result behavior for this test runner.
296    pub fn set_flaky_result(&mut self, flaky_result: FlakyResult) -> &mut Self {
297        self.flaky_result = Some(flaky_result);
298        self
299    }
300
301    /// Sets the max-fail value for this test runner.
302    pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
303        self.max_fail = Some(max_fail);
304        self
305    }
306
307    /// Sets the number of tests to run simultaneously.
308    pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
309        self.test_threads = Some(test_threads);
310        self
311    }
312
313    /// Sets the stress testing condition.
314    pub fn set_stress_condition(&mut self, stress_condition: StressCondition) -> &mut Self {
315        self.stress_condition = Some(stress_condition);
316        self
317    }
318
319    /// Sets the interceptor (debugger or tracer) to use for running tests.
320    pub fn set_interceptor(&mut self, interceptor: Interceptor) -> &mut Self {
321        self.interceptor = interceptor;
322        self
323    }
324
325    /// Sets the expected outstanding tests for rerun tracking.
326    ///
327    /// When set, the dispatcher will track which tests were seen during the run
328    /// and emit a `TestsNotSeen` as part of the `RunFinished` if some expected
329    /// tests were not seen.
330    pub fn set_expected_outstanding(
331        &mut self,
332        expected: BTreeSet<OwnedTestInstanceId>,
333    ) -> &mut Self {
334        self.expected_outstanding = Some(expected);
335        self
336    }
337
338    /// Sets version-related environment variables for tests and setup scripts.
339    pub fn set_version_env_vars(&mut self, version_env_vars: VersionEnvVars) -> &mut Self {
340        self.version_env_vars = Some(version_env_vars);
341        self
342    }
343
344    /// Creates a new test runner.
345    #[expect(clippy::too_many_arguments)]
346    pub fn build<'a>(
347        self,
348        test_list: &'a TestList,
349        profile: &'a EvaluatableProfile<'a>,
350        cli_args: Vec<String>,
351        signal_handler: SignalHandlerKind,
352        input_handler: InputHandlerKind,
353        double_spawn: DoubleSpawnInfo,
354        target_runner: TargetRunner,
355    ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
356        let test_threads = match self.capture_strategy {
357            CaptureStrategy::None => 1,
358            CaptureStrategy::Combined | CaptureStrategy::Split => self
359                .test_threads
360                .unwrap_or_else(|| profile.test_threads())
361                .compute(),
362        };
363        let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
364
365        let runtime = tokio::runtime::Builder::new_multi_thread()
366            .enable_all()
367            .thread_name("nextest-runner-worker")
368            .build()
369            .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
370        let _guard = runtime.enter();
371
372        // signal_handler.build() must be called from within the guard.
373        let signal_handler = signal_handler.build()?;
374
375        let input_handler = input_handler.build();
376
377        Ok(TestRunner {
378            inner: TestRunnerInner {
379                run_id: force_or_new_run_id(),
380                started_at: Local::now(),
381                profile,
382                test_list,
383                test_threads,
384                double_spawn,
385                target_runner,
386                capture_strategy: self.capture_strategy,
387                force_retries: self.retries,
388                force_flaky_result: self.flaky_result,
389                cli_args,
390                max_fail,
391                stress_condition: self.stress_condition,
392                interceptor: self.interceptor,
393                expected_outstanding: self.expected_outstanding,
394                version_env_vars: self.version_env_vars,
395                runtime,
396            },
397            signal_handler,
398            input_handler,
399        })
400    }
401}
402
403/// Stress testing condition.
404#[derive(Clone, Debug)]
405pub enum StressCondition {
406    /// Run each test `count` times.
407    Count(StressCount),
408
409    /// Run until this duration has elapsed.
410    Duration(Duration),
411}
412
413/// A count for stress testing.
414#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
415#[serde(tag = "kind", rename_all = "kebab-case")]
416#[cfg_attr(test, derive(test_strategy::Arbitrary))]
417pub enum StressCount {
418    /// Run each test `count` times.
419    Count {
420        /// The number of times to run each test.
421        count: NonZero<u32>,
422    },
423
424    /// Run indefinitely.
425    Infinite,
426}
427
428impl FromStr for StressCount {
429    type Err = StressCountParseError;
430
431    fn from_str(s: &str) -> Result<Self, Self::Err> {
432        if s == "infinite" {
433            Ok(StressCount::Infinite)
434        } else {
435            match s.parse() {
436                Ok(count) => Ok(StressCount::Count { count }),
437                Err(_) => Err(StressCountParseError::new(s)),
438            }
439        }
440    }
441}
442
443/// Context for running tests.
444///
445/// Created using [`TestRunnerBuilder::build`].
446#[derive(Debug)]
447pub struct TestRunner<'a> {
448    inner: TestRunnerInner<'a>,
449    signal_handler: SignalHandler,
450    input_handler: InputHandler,
451}
452
453impl<'a> TestRunner<'a> {
454    /// Returns the unique ID for this test run.
455    pub fn run_id(&self) -> ReportUuid {
456        self.inner.run_id
457    }
458
459    /// Returns the timestamp when this test run was started.
460    pub fn started_at(&self) -> DateTime<Local> {
461        self.inner.started_at
462    }
463
464    /// Returns the status of the input handler.
465    pub fn input_handler_status(&self) -> InputHandlerStatus {
466        self.input_handler.status()
467    }
468
469    /// Executes the listed tests, each one in its own process.
470    ///
471    /// The callback is called with the results of each test.
472    ///
473    /// Returns an error if any of the tasks panicked.
474    pub fn execute<F>(
475        self,
476        mut callback: F,
477    ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
478    where
479        F: FnMut(ReporterEvent<'a>) + Send,
480    {
481        self.try_execute::<Infallible, _>(|event| {
482            callback(event);
483            Ok(())
484        })
485    }
486
487    /// Executes the listed tests, each one in its own process.
488    ///
489    /// Accepts a callback that is called with the results of each test. If the callback returns an
490    /// error, the test run terminates and the callback is no longer called.
491    ///
492    /// Returns an error if any of the tasks panicked.
493    pub fn try_execute<E, F>(
494        mut self,
495        mut callback: F,
496    ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
497    where
498        F: FnMut(ReporterEvent<'a>) -> Result<(), E> + Send,
499        E: fmt::Debug + Send,
500    {
501        let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
502
503        // If report_cancel_tx is None, at least one error has occurred and the
504        // runner has been instructed to shut down. first_error is also set to
505        // Some in that case.
506        let mut report_cancel_tx = Some(report_cancel_tx);
507        let mut first_error = None;
508
509        let res = self.inner.execute(
510            &mut self.signal_handler,
511            &mut self.input_handler,
512            report_cancel_rx,
513            |event| {
514                match callback(event) {
515                    Ok(()) => {}
516                    Err(error) => {
517                        // If the callback fails, we need to let the runner know to start shutting
518                        // down. But we keep reporting results in case the callback starts working
519                        // again.
520                        if let Some(report_cancel_tx) = report_cancel_tx.take() {
521                            let _ = report_cancel_tx.send(());
522                            first_error = Some(error);
523                        }
524                    }
525                }
526            },
527        );
528
529        // On Windows, the stdout and stderr futures might spawn processes that keep the runner
530        // stuck indefinitely if it's dropped the normal way. Shut it down aggressively, being OK
531        // with leaked resources.
532        self.inner.runtime.shutdown_background();
533
534        match (res, first_error) {
535            (Ok(run_stats), None) => Ok(run_stats),
536            (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
537                report_error: Some(report_error),
538                join_errors: Vec::new(),
539            }),
540            (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
541                report_error,
542                join_errors,
543            }),
544        }
545    }
546}
547
548#[derive(Debug)]
549struct TestRunnerInner<'a> {
550    run_id: ReportUuid,
551    started_at: DateTime<Local>,
552    profile: &'a EvaluatableProfile<'a>,
553    test_list: &'a TestList<'a>,
554    test_threads: usize,
555    double_spawn: DoubleSpawnInfo,
556    target_runner: TargetRunner,
557    capture_strategy: CaptureStrategy,
558    force_retries: Option<RetryPolicy>,
559    force_flaky_result: Option<FlakyResult>,
560    cli_args: Vec<String>,
561    max_fail: MaxFail,
562    stress_condition: Option<StressCondition>,
563    interceptor: Interceptor,
564    expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
565    version_env_vars: Option<VersionEnvVars>,
566    runtime: Runtime,
567}
568
569impl<'a> TestRunnerInner<'a> {
570    fn execute<F>(
571        &self,
572        signal_handler: &mut SignalHandler,
573        input_handler: &mut InputHandler,
574        report_cancel_rx: oneshot::Receiver<()>,
575        callback: F,
576    ) -> Result<RunStats, Vec<JoinError>>
577    where
578        F: FnMut(ReporterEvent<'a>) + Send,
579    {
580        // TODO: add support for other test-running approaches, measure performance.
581
582        // Disable the global timeout when an interceptor is active.
583        let global_timeout = if self.interceptor.should_disable_timeouts() {
584            crate::time::far_future_duration()
585        } else {
586            self.profile.global_timeout(self.test_list.mode()).period
587        };
588
589        let mut dispatcher_cx = DispatcherContext::new(
590            callback,
591            self.run_id,
592            self.profile.name(),
593            self.cli_args.clone(),
594            self.test_list.run_count(),
595            self.max_fail,
596            global_timeout,
597            self.stress_condition.clone(),
598            self.expected_outstanding.clone(),
599        );
600
601        let executor_cx = ExecutorContext::new(
602            self.run_id,
603            self.profile,
604            self.test_list,
605            self.test_threads,
606            self.double_spawn.clone(),
607            self.target_runner.clone(),
608            self.capture_strategy,
609            self.force_retries,
610            self.force_flaky_result,
611            self.interceptor.clone(),
612            self.version_env_vars.clone(),
613        );
614
615        // Send the initial event.
616        dispatcher_cx.run_started(self.test_list, self.test_threads);
617
618        let _guard = self.runtime.enter();
619
620        let mut report_cancel_rx = std::pin::pin!(report_cancel_rx.fuse());
621
622        if self.stress_condition.is_some() {
623            loop {
624                let progress = dispatcher_cx
625                    .stress_progress()
626                    .expect("stress_condition is Some => stress progress is Some");
627                if progress.remaining().is_some() {
628                    dispatcher_cx.stress_sub_run_started(progress);
629
630                    self.do_run(
631                        dispatcher_cx.stress_index(),
632                        &mut dispatcher_cx,
633                        &executor_cx,
634                        signal_handler,
635                        input_handler,
636                        report_cancel_rx.as_mut(),
637                    )?;
638
639                    dispatcher_cx.stress_sub_run_finished();
640
641                    if dispatcher_cx.cancel_reason().is_some() {
642                        break;
643                    }
644                } else {
645                    break;
646                }
647            }
648        } else {
649            self.do_run(
650                None,
651                &mut dispatcher_cx,
652                &executor_cx,
653                signal_handler,
654                input_handler,
655                report_cancel_rx,
656            )?;
657        }
658
659        let run_stats = dispatcher_cx.run_stats();
660        dispatcher_cx.run_finished();
661
662        Ok(run_stats)
663    }
664
665    fn do_run<F>(
666        &self,
667        stress_index: Option<StressIndex>,
668        dispatcher_cx: &mut DispatcherContext<'a, F>,
669        executor_cx: &ExecutorContext<'a>,
670        signal_handler: &mut SignalHandler,
671        input_handler: &mut InputHandler,
672        report_cancel_rx: Pin<&mut Fuse<oneshot::Receiver<()>>>,
673    ) -> Result<(), Vec<JoinError>>
674    where
675        F: FnMut(ReporterEvent<'a>) + Send,
676    {
677        let ((), results) = TokioScope::scope_and_block(move |scope| {
678            let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
679
680            // Run the dispatcher to completion in a task.
681            let dispatcher_fut =
682                dispatcher_cx.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
683            scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
684
685            let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
686            let script_resp_tx = resp_tx.clone();
687            let run_scripts_fut = async move {
688                // Since script tasks are run serially, we just reuse the one
689                // script task.
690                let script_data = executor_cx
691                    .run_setup_scripts(stress_index, script_resp_tx)
692                    .await;
693                if script_tx.send(script_data).is_err() {
694                    // The dispatcher has shut down, so we should too.
695                    debug!("script_tx.send failed, shutting down");
696                }
697                RunnerTaskState::finished_no_children()
698            };
699            scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
700
701            let Some(script_data) = script_rx.blocking_recv() else {
702                // Most likely the harness is shutting down, so we should too.
703                debug!("no script data received, shutting down");
704                return;
705            };
706
707            // groups is going to be passed to future_queue_grouped.
708            let groups = self
709                .profile
710                .test_group_config()
711                .iter()
712                .map(|(group_name, config)| (group_name, config.max_threads.compute()));
713
714            let setup_script_data = Arc::new(script_data);
715
716            let filter_resp_tx = resp_tx.clone();
717
718            let tests = self.test_list.to_priority_queue(self.profile);
719            let run_tests_fut = futures::stream::iter(tests)
720                .filter_map(move |test| {
721                    // Filter tests before assigning a FutureQueueContext to
722                    // them.
723                    //
724                    // Note that this function is called lazily due to the
725                    // `future_queue_grouped` below. This means that skip
726                    // notifications will go out as tests are iterated over, not
727                    // all at once.
728                    let filter_resp_tx = filter_resp_tx.clone();
729                    async move {
730                        if let FilterMatch::Mismatch { reason } =
731                            test.instance.test_info.filter_match
732                        {
733                            // Failure to send means the receiver was dropped.
734                            let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
735                                stress_index,
736                                test_instance: test.instance,
737                                reason,
738                            });
739                            return None;
740                        }
741                        Some(test)
742                    }
743                })
744                .map(move |test: TestInstanceWithSettings<'a>| {
745                    let threads_required =
746                        test.settings.threads_required().compute(self.test_threads);
747                    let test_group = match test.settings.test_group() {
748                        TestGroup::Global => None,
749                        TestGroup::Custom(name) => Some(name.clone()),
750                    };
751                    let resp_tx = resp_tx.clone();
752                    let setup_script_data = setup_script_data.clone();
753
754                    let test_instance = test.instance;
755
756                    let f = move |cx: FutureQueueContext| {
757                        debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
758                        // Use a separate Tokio task for each test. For repos
759                        // with lots of small tests, this has been observed to
760                        // be much faster than using a single task for all tests
761                        // (what we used to do). It also provides some degree of
762                        // per-test isolation.
763                        async move {
764                            // SAFETY: Within an outer scope_and_block (which we
765                            // have here), scope_and_collect is safe as long as
766                            // the returned future isn't forgotten. We're not
767                            // forgetting it below -- we're running it to
768                            // completion immediately.
769                            //
770                            // But recursive scoped calls really feel like
771                            // pushing against the limits of async-scoped. For
772                            // example, there's no way built into async-scoped
773                            // to propagate a cancellation signal from the outer
774                            // scope to the inner scope. (But there could be,
775                            // right? That seems solvable via channels. And we
776                            // could likely do our own channels here.)
777                            let ((), mut ret) = unsafe {
778                                TokioScope::scope_and_collect(move |scope| {
779                                    scope.spawn(executor_cx.run_test_instance(
780                                        stress_index,
781                                        test,
782                                        cx,
783                                        resp_tx.clone(),
784                                        setup_script_data,
785                                    ))
786                                })
787                            }
788                            .await;
789
790                            // If no future was started, that's really strange.
791                            // Worth at least logging.
792                            let Some(result) = ret.pop() else {
793                                warn!(
794                                    "no task was started for test instance: {}",
795                                    test_instance.id()
796                                );
797                                return None;
798                            };
799                            result.err()
800                        }
801                    };
802
803                    (threads_required, test_group, f)
804                })
805                // future_queue_grouped means tests are spawned in the order
806                // defined, but returned in any order.
807                .future_queue_grouped(self.test_threads, groups)
808                // Drop the None values.
809                .filter_map(std::future::ready)
810                .collect::<Vec<_>>()
811                // Interestingly, using a more idiomatic `async move {
812                // run_tests_fut.await ... }` block causes Rust 1.83 to complain
813                // about a weird lifetime mismatch. FutureExt::map as used below
814                // does not.
815                .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
816
817            scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
818        });
819
820        // Were there any join errors in tasks?
821        //
822        // If one of the tasks panics, we likely end up stuck because the
823        // dispatcher, which is spawned in the same async-scoped block, doesn't
824        // get relayed the panic immediately. That should probably be fixed at
825        // some point.
826        let mut cancelled_count = 0;
827        let join_errors = results
828            .into_iter()
829            .flat_map(|r| {
830                match r {
831                    Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
832                    // Largely ignore cancelled tasks since it most likely means
833                    // shutdown -- we don't cancel tasks manually.
834                    Ok(RunnerTaskState::Cancelled) => {
835                        cancelled_count += 1;
836                        Vec::new()
837                    }
838                    Err(join_error) => vec![join_error],
839                }
840            })
841            .collect::<Vec<_>>();
842
843        if cancelled_count > 0 {
844            debug!(
845                "{} tasks were cancelled -- this \
846                 generally should only happen due to panics",
847                cancelled_count
848            );
849        }
850        if !join_errors.is_empty() {
851            return Err(join_errors);
852        }
853
854        Ok(())
855    }
856}
857
858/// Configures stdout, stdin and stderr inheritance by test processes on Windows.
859///
860/// With Rust on Windows, these handles can be held open by tests (and therefore by grandchild processes)
861/// even if we run the tests with `Stdio::inherit`. This can cause problems with leaky tests.
862///
863/// This changes global state on the Win32 side, so the application must manage mutual exclusion
864/// around it. Call this right before [`TestRunner::try_execute`].
865///
866/// This is a no-op on non-Windows platforms.
867///
868/// See [this issue on the Rust repository](https://github.com/rust-lang/rust/issues/54760) for more
869/// discussion.
870pub fn configure_handle_inheritance(
871    no_capture: bool,
872) -> Result<(), ConfigureHandleInheritanceError> {
873    super::os::configure_handle_inheritance_impl(no_capture)
874}
875
876/// Environment variable to force a specific run ID (for testing).
877const FORCE_RUN_ID_ENV: &str = "__NEXTEST_FORCE_RUN_ID";
878
879/// Returns a forced run ID from the environment, or generates a new one.
880fn force_or_new_run_id() -> ReportUuid {
881    if let Ok(id_str) = std::env::var(FORCE_RUN_ID_ENV) {
882        match id_str.parse::<ReportUuid>() {
883            Ok(uuid) => return uuid,
884            Err(err) => {
885                warn!(
886                    "{FORCE_RUN_ID_ENV} is set but invalid (expected UUID): {err}, \
887                     generating random ID"
888                );
889            }
890        }
891    }
892    ReportUuid::new_v4()
893}
894
895#[cfg(test)]
896mod tests {
897    use super::*;
898    use crate::{config::core::NextestConfig, platform::BuildPlatforms};
899
900    #[test]
901    fn no_capture_settings() {
902        // Ensure that output settings are ignored with no-capture.
903        let mut builder = TestRunnerBuilder::default();
904        builder
905            .set_capture_strategy(CaptureStrategy::None)
906            .set_test_threads(TestThreads::Count(20));
907        let test_list = TestList::empty();
908        let config = NextestConfig::default_config("/fake/dir");
909        let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
910        let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
911        let signal_handler = SignalHandlerKind::Noop;
912        let input_handler = InputHandlerKind::Noop;
913        let profile = profile.apply_build_platforms(&build_platforms);
914        let runner = builder
915            .build(
916                &test_list,
917                &profile,
918                vec![],
919                signal_handler,
920                input_handler,
921                DoubleSpawnInfo::disabled(),
922                TargetRunner::empty(),
923            )
924            .unwrap();
925        assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
926        assert_eq!(runner.inner.test_threads, 1, "tests run serially");
927    }
928
929    #[test]
930    fn test_debugger_command_parsing() {
931        // Valid commands
932        let cmd = DebuggerCommand::from_str("gdb --args").unwrap();
933        assert_eq!(cmd.program(), "gdb");
934        assert_eq!(cmd.args(), &["--args"]);
935
936        let cmd = DebuggerCommand::from_str("rust-gdb -ex run --args").unwrap();
937        assert_eq!(cmd.program(), "rust-gdb");
938        assert_eq!(cmd.args(), &["-ex", "run", "--args"]);
939
940        // With quotes
941        let cmd = DebuggerCommand::from_str(r#"gdb -ex "set print pretty on" --args"#).unwrap();
942        assert_eq!(cmd.program(), "gdb");
943        assert_eq!(cmd.args(), &["-ex", "set print pretty on", "--args"]);
944
945        // Empty command
946        let err = DebuggerCommand::from_str("").unwrap_err();
947        assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
948
949        // Whitespace only
950        let err = DebuggerCommand::from_str("   ").unwrap_err();
951        assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
952    }
953}