nextest_runner/runner/
imp.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6    config::{
7        core::EvaluatableProfile,
8        elements::{MaxFail, RetryPolicy, TestGroup, TestThreads},
9        scripts::SetupScriptExecuteData,
10    },
11    double_spawn::DoubleSpawnInfo,
12    errors::{
13        ConfigureHandleInheritanceError, DebuggerCommandParseError, StressCountParseError,
14        TestRunnerBuildError, TestRunnerExecuteErrors, TracerCommandParseError,
15    },
16    input::{InputHandler, InputHandlerKind, InputHandlerStatus},
17    list::{OwnedTestInstanceId, TestInstanceWithSettings, TestList},
18    reporter::events::{ReporterEvent, RunStats, StressIndex},
19    runner::ExecutorEvent,
20    signal::{SignalHandler, SignalHandlerKind},
21    target_runner::TargetRunner,
22    test_output::CaptureStrategy,
23};
24use async_scoped::TokioScope;
25use chrono::{DateTime, Local};
26use future_queue::{FutureQueueContext, StreamExt};
27use futures::{future::Fuse, prelude::*};
28use nextest_metadata::FilterMatch;
29use quick_junit::ReportUuid;
30use semver::Version;
31use std::{
32    collections::BTreeSet, convert::Infallible, fmt, num::NonZero, pin::Pin, str::FromStr,
33    sync::Arc, time::Duration,
34};
35use tokio::{
36    runtime::Runtime,
37    sync::{mpsc::unbounded_channel, oneshot},
38    task::JoinError,
39};
40use tracing::{debug, warn};
41
42/// A parsed debugger command.
43#[derive(Clone, Debug)]
44pub struct DebuggerCommand {
45    program: String,
46    args: Vec<String>,
47}
48
49impl DebuggerCommand {
50    /// Gets the program.
51    pub fn program(&self) -> &str {
52        // The from_str constructor ensures that there is at least one part.
53        &self.program
54    }
55
56    /// Gets the arguments.
57    pub fn args(&self) -> &[String] {
58        &self.args
59    }
60}
61
62impl FromStr for DebuggerCommand {
63    type Err = DebuggerCommandParseError;
64
65    fn from_str(command: &str) -> Result<Self, Self::Err> {
66        let mut parts =
67            shell_words::split(command).map_err(DebuggerCommandParseError::ShellWordsParse)?;
68        if parts.is_empty() {
69            return Err(DebuggerCommandParseError::EmptyCommand);
70        }
71        let program = parts.remove(0);
72        Ok(Self {
73            program,
74            args: parts,
75        })
76    }
77}
78
79/// A parsed tracer command.
80#[derive(Clone, Debug)]
81pub struct TracerCommand {
82    program: String,
83    args: Vec<String>,
84}
85
86impl TracerCommand {
87    /// Gets the program.
88    pub fn program(&self) -> &str {
89        &self.program
90    }
91
92    /// Gets the arguments.
93    pub fn args(&self) -> &[String] {
94        &self.args
95    }
96}
97
98impl FromStr for TracerCommand {
99    type Err = TracerCommandParseError;
100
101    fn from_str(command: &str) -> Result<Self, Self::Err> {
102        let mut parts =
103            shell_words::split(command).map_err(TracerCommandParseError::ShellWordsParse)?;
104        if parts.is_empty() {
105            return Err(TracerCommandParseError::EmptyCommand);
106        }
107        let program = parts.remove(0);
108        Ok(Self {
109            program,
110            args: parts,
111        })
112    }
113}
114
115/// An interceptor wraps test execution with a debugger or tracer.
116#[derive(Clone, Debug, Default)]
117pub enum Interceptor {
118    /// No interceptor - standard test execution.
119    #[default]
120    None,
121
122    /// Run the test under a debugger.
123    Debugger(DebuggerCommand),
124
125    /// Run the test under a syscall tracer.
126    Tracer(TracerCommand),
127}
128
129impl Interceptor {
130    /// Returns true if timeouts should be disabled.
131    ///
132    /// Both debuggers and tracers disable timeouts.
133    pub fn should_disable_timeouts(&self) -> bool {
134        match self {
135            Interceptor::None => false,
136            Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
137        }
138    }
139
140    /// Returns true if stdin should be passed through to child test processes.
141    ///
142    /// Only debuggers need stdin passthrough for interactive debugging.
143    pub fn should_passthrough_stdin(&self) -> bool {
144        match self {
145            Interceptor::None | Interceptor::Tracer(_) => false,
146            Interceptor::Debugger(_) => true,
147        }
148    }
149
150    /// Returns true if a process group should be created for the child.
151    ///
152    /// Debuggers need terminal control, so no process group is created. Tracers
153    /// work fine with process groups.
154    pub fn should_create_process_group(&self) -> bool {
155        match self {
156            Interceptor::None | Interceptor::Tracer(_) => true,
157            Interceptor::Debugger(_) => false,
158        }
159    }
160
161    /// Returns true if leak detection should be skipped.
162    ///
163    /// Both debuggers and tracers skip leak detection to avoid interference.
164    pub fn should_skip_leak_detection(&self) -> bool {
165        match self {
166            Interceptor::None => false,
167            Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
168        }
169    }
170
171    /// Returns true if the test command should be displayed.
172    ///
173    /// Used to determine if we should print the wrapper command for debugging.
174    pub fn should_show_wrapper_command(&self) -> bool {
175        match self {
176            Interceptor::None => false,
177            Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
178        }
179    }
180
181    /// Returns true if, on receiving SIGTSTP, we should send SIGTSTP to the
182    /// child.
183    ///
184    /// Debugger mode has special signal handling where we don't send SIGTSTP to
185    /// the child (it receives it directly from the terminal, since no process
186    /// group is created). Tracers use standard signal handling.
187    pub fn should_send_sigtstp(&self) -> bool {
188        match self {
189            Interceptor::None | Interceptor::Tracer(_) => true,
190            Interceptor::Debugger(_) => false,
191        }
192    }
193}
194
195/// Version-related environment variables set for tests and setup scripts.
196///
197/// These expose the current nextest version and any version constraints from
198/// the repository's configuration.
199#[derive(Clone, Debug)]
200pub struct VersionEnvVars {
201    /// The current nextest version.
202    pub current_version: Version,
203
204    /// The required nextest version from configuration, if any.
205    pub required_version: Option<Version>,
206
207    /// The recommended nextest version from configuration, if any.
208    pub recommended_version: Option<Version>,
209}
210
211impl VersionEnvVars {
212    /// Applies the version environment variables to a command.
213    pub(super) fn apply_env(&self, cmd: &mut std::process::Command) {
214        cmd.env("NEXTEST_VERSION", self.current_version.to_string());
215        cmd.env(
216            "NEXTEST_REQUIRED_VERSION",
217            match &self.required_version {
218                Some(v) => v.to_string(),
219                None => "none".to_owned(),
220            },
221        );
222        cmd.env(
223            "NEXTEST_RECOMMENDED_VERSION",
224            match &self.recommended_version {
225                Some(v) => v.to_string(),
226                None => "none".to_owned(),
227            },
228        );
229    }
230}
231
232/// A child process identifier: either a single process or a process group.
233#[derive(Copy, Clone, Debug)]
234pub(super) enum ChildPid {
235    /// A single process ID.
236    Process(#[cfg_attr(not(unix), expect(unused))] u32),
237
238    /// A process group ID.
239    #[cfg(unix)]
240    ProcessGroup(u32),
241}
242
243impl ChildPid {
244    /// Returns the PID value to use with `libc::kill`.
245    ///
246    /// - `Process(pid)` returns `pid as i32` (positive, kills single process).
247    /// - `ProcessGroup(pid)` returns `-(pid as i32)` (negative, kills process group).
248    ///
249    /// On Windows, always returns `pid as i32`.
250    #[cfg(unix)]
251    pub(super) fn for_kill(self) -> i32 {
252        match self {
253            ChildPid::Process(pid) => pid as i32,
254            ChildPid::ProcessGroup(pid) => -(pid as i32),
255        }
256    }
257}
258
259/// Test runner options.
260#[derive(Debug, Default)]
261pub struct TestRunnerBuilder {
262    capture_strategy: CaptureStrategy,
263    retries: Option<RetryPolicy>,
264    max_fail: Option<MaxFail>,
265    test_threads: Option<TestThreads>,
266    stress_condition: Option<StressCondition>,
267    interceptor: Interceptor,
268    expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
269    version_env_vars: Option<VersionEnvVars>,
270}
271
272impl TestRunnerBuilder {
273    /// Sets the capture strategy for the test runner
274    ///
275    /// * [`CaptureStrategy::Split`]
276    ///   * pro: output from `stdout` and `stderr` can be identified and easily split
277    ///   * con: ordering between the streams cannot be guaranteed
278    /// * [`CaptureStrategy::Combined`]
279    ///   * pro: output is guaranteed to be ordered as it would in a terminal emulator
280    ///   * con: distinction between `stdout` and `stderr` is lost
281    /// * [`CaptureStrategy::None`] -
282    ///   * In this mode, tests will always be run serially: `test_threads` will always be 1.
283    pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
284        self.capture_strategy = strategy;
285        self
286    }
287
288    /// Sets the number of retries for this test runner.
289    pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
290        self.retries = Some(retries);
291        self
292    }
293
294    /// Sets the max-fail value for this test runner.
295    pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
296        self.max_fail = Some(max_fail);
297        self
298    }
299
300    /// Sets the number of tests to run simultaneously.
301    pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
302        self.test_threads = Some(test_threads);
303        self
304    }
305
306    /// Sets the stress testing condition.
307    pub fn set_stress_condition(&mut self, stress_condition: StressCondition) -> &mut Self {
308        self.stress_condition = Some(stress_condition);
309        self
310    }
311
312    /// Sets the interceptor (debugger or tracer) to use for running tests.
313    pub fn set_interceptor(&mut self, interceptor: Interceptor) -> &mut Self {
314        self.interceptor = interceptor;
315        self
316    }
317
318    /// Sets the expected outstanding tests for rerun tracking.
319    ///
320    /// When set, the dispatcher will track which tests were seen during the run
321    /// and emit a `TestsNotSeen` as part of the `RunFinished` if some expected
322    /// tests were not seen.
323    pub fn set_expected_outstanding(
324        &mut self,
325        expected: BTreeSet<OwnedTestInstanceId>,
326    ) -> &mut Self {
327        self.expected_outstanding = Some(expected);
328        self
329    }
330
331    /// Sets version-related environment variables for tests and setup scripts.
332    pub fn set_version_env_vars(&mut self, version_env_vars: VersionEnvVars) -> &mut Self {
333        self.version_env_vars = Some(version_env_vars);
334        self
335    }
336
337    /// Creates a new test runner.
338    #[expect(clippy::too_many_arguments)]
339    pub fn build<'a>(
340        self,
341        test_list: &'a TestList,
342        profile: &'a EvaluatableProfile<'a>,
343        cli_args: Vec<String>,
344        signal_handler: SignalHandlerKind,
345        input_handler: InputHandlerKind,
346        double_spawn: DoubleSpawnInfo,
347        target_runner: TargetRunner,
348    ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
349        let test_threads = match self.capture_strategy {
350            CaptureStrategy::None => 1,
351            CaptureStrategy::Combined | CaptureStrategy::Split => self
352                .test_threads
353                .unwrap_or_else(|| profile.test_threads())
354                .compute(),
355        };
356        let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
357
358        let runtime = tokio::runtime::Builder::new_multi_thread()
359            .enable_all()
360            .thread_name("nextest-runner-worker")
361            .build()
362            .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
363        let _guard = runtime.enter();
364
365        // signal_handler.build() must be called from within the guard.
366        let signal_handler = signal_handler.build()?;
367
368        let input_handler = input_handler.build();
369
370        Ok(TestRunner {
371            inner: TestRunnerInner {
372                run_id: force_or_new_run_id(),
373                started_at: Local::now(),
374                profile,
375                test_list,
376                test_threads,
377                double_spawn,
378                target_runner,
379                capture_strategy: self.capture_strategy,
380                force_retries: self.retries,
381                cli_args,
382                max_fail,
383                stress_condition: self.stress_condition,
384                interceptor: self.interceptor,
385                expected_outstanding: self.expected_outstanding,
386                version_env_vars: self.version_env_vars,
387                runtime,
388            },
389            signal_handler,
390            input_handler,
391        })
392    }
393}
394
395/// Stress testing condition.
396#[derive(Clone, Debug)]
397pub enum StressCondition {
398    /// Run each test `count` times.
399    Count(StressCount),
400
401    /// Run until this duration has elapsed.
402    Duration(Duration),
403}
404
405/// A count for stress testing.
406#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
407#[serde(tag = "kind", rename_all = "kebab-case")]
408#[cfg_attr(test, derive(test_strategy::Arbitrary))]
409pub enum StressCount {
410    /// Run each test `count` times.
411    Count {
412        /// The number of times to run each test.
413        count: NonZero<u32>,
414    },
415
416    /// Run indefinitely.
417    Infinite,
418}
419
420impl FromStr for StressCount {
421    type Err = StressCountParseError;
422
423    fn from_str(s: &str) -> Result<Self, Self::Err> {
424        if s == "infinite" {
425            Ok(StressCount::Infinite)
426        } else {
427            match s.parse() {
428                Ok(count) => Ok(StressCount::Count { count }),
429                Err(_) => Err(StressCountParseError::new(s)),
430            }
431        }
432    }
433}
434
435/// Context for running tests.
436///
437/// Created using [`TestRunnerBuilder::build`].
438#[derive(Debug)]
439pub struct TestRunner<'a> {
440    inner: TestRunnerInner<'a>,
441    signal_handler: SignalHandler,
442    input_handler: InputHandler,
443}
444
445impl<'a> TestRunner<'a> {
446    /// Returns the unique ID for this test run.
447    pub fn run_id(&self) -> ReportUuid {
448        self.inner.run_id
449    }
450
451    /// Returns the timestamp when this test run was started.
452    pub fn started_at(&self) -> DateTime<Local> {
453        self.inner.started_at
454    }
455
456    /// Returns the status of the input handler.
457    pub fn input_handler_status(&self) -> InputHandlerStatus {
458        self.input_handler.status()
459    }
460
461    /// Executes the listed tests, each one in its own process.
462    ///
463    /// The callback is called with the results of each test.
464    ///
465    /// Returns an error if any of the tasks panicked.
466    pub fn execute<F>(
467        self,
468        mut callback: F,
469    ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
470    where
471        F: FnMut(ReporterEvent<'a>) + Send,
472    {
473        self.try_execute::<Infallible, _>(|event| {
474            callback(event);
475            Ok(())
476        })
477    }
478
479    /// Executes the listed tests, each one in its own process.
480    ///
481    /// Accepts a callback that is called with the results of each test. If the callback returns an
482    /// error, the test run terminates and the callback is no longer called.
483    ///
484    /// Returns an error if any of the tasks panicked.
485    pub fn try_execute<E, F>(
486        mut self,
487        mut callback: F,
488    ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
489    where
490        F: FnMut(ReporterEvent<'a>) -> Result<(), E> + Send,
491        E: fmt::Debug + Send,
492    {
493        let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
494
495        // If report_cancel_tx is None, at least one error has occurred and the
496        // runner has been instructed to shut down. first_error is also set to
497        // Some in that case.
498        let mut report_cancel_tx = Some(report_cancel_tx);
499        let mut first_error = None;
500
501        let res = self.inner.execute(
502            &mut self.signal_handler,
503            &mut self.input_handler,
504            report_cancel_rx,
505            |event| {
506                match callback(event) {
507                    Ok(()) => {}
508                    Err(error) => {
509                        // If the callback fails, we need to let the runner know to start shutting
510                        // down. But we keep reporting results in case the callback starts working
511                        // again.
512                        if let Some(report_cancel_tx) = report_cancel_tx.take() {
513                            let _ = report_cancel_tx.send(());
514                            first_error = Some(error);
515                        }
516                    }
517                }
518            },
519        );
520
521        // On Windows, the stdout and stderr futures might spawn processes that keep the runner
522        // stuck indefinitely if it's dropped the normal way. Shut it down aggressively, being OK
523        // with leaked resources.
524        self.inner.runtime.shutdown_background();
525
526        match (res, first_error) {
527            (Ok(run_stats), None) => Ok(run_stats),
528            (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
529                report_error: Some(report_error),
530                join_errors: Vec::new(),
531            }),
532            (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
533                report_error,
534                join_errors,
535            }),
536        }
537    }
538}
539
540#[derive(Debug)]
541struct TestRunnerInner<'a> {
542    run_id: ReportUuid,
543    started_at: DateTime<Local>,
544    profile: &'a EvaluatableProfile<'a>,
545    test_list: &'a TestList<'a>,
546    test_threads: usize,
547    double_spawn: DoubleSpawnInfo,
548    target_runner: TargetRunner,
549    capture_strategy: CaptureStrategy,
550    force_retries: Option<RetryPolicy>,
551    cli_args: Vec<String>,
552    max_fail: MaxFail,
553    stress_condition: Option<StressCondition>,
554    interceptor: Interceptor,
555    expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
556    version_env_vars: Option<VersionEnvVars>,
557    runtime: Runtime,
558}
559
560impl<'a> TestRunnerInner<'a> {
561    fn execute<F>(
562        &self,
563        signal_handler: &mut SignalHandler,
564        input_handler: &mut InputHandler,
565        report_cancel_rx: oneshot::Receiver<()>,
566        callback: F,
567    ) -> Result<RunStats, Vec<JoinError>>
568    where
569        F: FnMut(ReporterEvent<'a>) + Send,
570    {
571        // TODO: add support for other test-running approaches, measure performance.
572
573        // Disable the global timeout when an interceptor is active.
574        let global_timeout = if self.interceptor.should_disable_timeouts() {
575            crate::time::far_future_duration()
576        } else {
577            self.profile.global_timeout(self.test_list.mode()).period
578        };
579
580        let mut dispatcher_cx = DispatcherContext::new(
581            callback,
582            self.run_id,
583            self.profile.name(),
584            self.cli_args.clone(),
585            self.test_list.run_count(),
586            self.max_fail,
587            global_timeout,
588            self.stress_condition.clone(),
589            self.expected_outstanding.clone(),
590        );
591
592        let executor_cx = ExecutorContext::new(
593            self.run_id,
594            self.profile,
595            self.test_list,
596            self.test_threads,
597            self.double_spawn.clone(),
598            self.target_runner.clone(),
599            self.capture_strategy,
600            self.force_retries,
601            self.interceptor.clone(),
602            self.version_env_vars.clone(),
603        );
604
605        // Send the initial event.
606        dispatcher_cx.run_started(self.test_list, self.test_threads);
607
608        let _guard = self.runtime.enter();
609
610        let mut report_cancel_rx = std::pin::pin!(report_cancel_rx.fuse());
611
612        if self.stress_condition.is_some() {
613            loop {
614                let progress = dispatcher_cx
615                    .stress_progress()
616                    .expect("stress_condition is Some => stress progress is Some");
617                if progress.remaining().is_some() {
618                    dispatcher_cx.stress_sub_run_started(progress);
619
620                    self.do_run(
621                        dispatcher_cx.stress_index(),
622                        &mut dispatcher_cx,
623                        &executor_cx,
624                        signal_handler,
625                        input_handler,
626                        report_cancel_rx.as_mut(),
627                    )?;
628
629                    dispatcher_cx.stress_sub_run_finished();
630
631                    if dispatcher_cx.cancel_reason().is_some() {
632                        break;
633                    }
634                } else {
635                    break;
636                }
637            }
638        } else {
639            self.do_run(
640                None,
641                &mut dispatcher_cx,
642                &executor_cx,
643                signal_handler,
644                input_handler,
645                report_cancel_rx,
646            )?;
647        }
648
649        let run_stats = dispatcher_cx.run_stats();
650        dispatcher_cx.run_finished();
651
652        Ok(run_stats)
653    }
654
655    fn do_run<F>(
656        &self,
657        stress_index: Option<StressIndex>,
658        dispatcher_cx: &mut DispatcherContext<'a, F>,
659        executor_cx: &ExecutorContext<'a>,
660        signal_handler: &mut SignalHandler,
661        input_handler: &mut InputHandler,
662        report_cancel_rx: Pin<&mut Fuse<oneshot::Receiver<()>>>,
663    ) -> Result<(), Vec<JoinError>>
664    where
665        F: FnMut(ReporterEvent<'a>) + Send,
666    {
667        let ((), results) = TokioScope::scope_and_block(move |scope| {
668            let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
669
670            // Run the dispatcher to completion in a task.
671            let dispatcher_fut =
672                dispatcher_cx.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
673            scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
674
675            let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
676            let script_resp_tx = resp_tx.clone();
677            let run_scripts_fut = async move {
678                // Since script tasks are run serially, we just reuse the one
679                // script task.
680                let script_data = executor_cx
681                    .run_setup_scripts(stress_index, script_resp_tx)
682                    .await;
683                if script_tx.send(script_data).is_err() {
684                    // The dispatcher has shut down, so we should too.
685                    debug!("script_tx.send failed, shutting down");
686                }
687                RunnerTaskState::finished_no_children()
688            };
689            scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
690
691            let Some(script_data) = script_rx.blocking_recv() else {
692                // Most likely the harness is shutting down, so we should too.
693                debug!("no script data received, shutting down");
694                return;
695            };
696
697            // groups is going to be passed to future_queue_grouped.
698            let groups = self
699                .profile
700                .test_group_config()
701                .iter()
702                .map(|(group_name, config)| (group_name, config.max_threads.compute()));
703
704            let setup_script_data = Arc::new(script_data);
705
706            let filter_resp_tx = resp_tx.clone();
707
708            let tests = self.test_list.to_priority_queue(self.profile);
709            let run_tests_fut = futures::stream::iter(tests)
710                .filter_map(move |test| {
711                    // Filter tests before assigning a FutureQueueContext to
712                    // them.
713                    //
714                    // Note that this function is called lazily due to the
715                    // `future_queue_grouped` below. This means that skip
716                    // notifications will go out as tests are iterated over, not
717                    // all at once.
718                    let filter_resp_tx = filter_resp_tx.clone();
719                    async move {
720                        if let FilterMatch::Mismatch { reason } =
721                            test.instance.test_info.filter_match
722                        {
723                            // Failure to send means the receiver was dropped.
724                            let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
725                                stress_index,
726                                test_instance: test.instance,
727                                reason,
728                            });
729                            return None;
730                        }
731                        Some(test)
732                    }
733                })
734                .map(move |test: TestInstanceWithSettings<'a>| {
735                    let threads_required =
736                        test.settings.threads_required().compute(self.test_threads);
737                    let test_group = match test.settings.test_group() {
738                        TestGroup::Global => None,
739                        TestGroup::Custom(name) => Some(name.clone()),
740                    };
741                    let resp_tx = resp_tx.clone();
742                    let setup_script_data = setup_script_data.clone();
743
744                    let test_instance = test.instance;
745
746                    let f = move |cx: FutureQueueContext| {
747                        debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
748                        // Use a separate Tokio task for each test. For repos
749                        // with lots of small tests, this has been observed to
750                        // be much faster than using a single task for all tests
751                        // (what we used to do). It also provides some degree of
752                        // per-test isolation.
753                        async move {
754                            // SAFETY: Within an outer scope_and_block (which we
755                            // have here), scope_and_collect is safe as long as
756                            // the returned future isn't forgotten. We're not
757                            // forgetting it below -- we're running it to
758                            // completion immediately.
759                            //
760                            // But recursive scoped calls really feel like
761                            // pushing against the limits of async-scoped. For
762                            // example, there's no way built into async-scoped
763                            // to propagate a cancellation signal from the outer
764                            // scope to the inner scope. (But there could be,
765                            // right? That seems solvable via channels. And we
766                            // could likely do our own channels here.)
767                            let ((), mut ret) = unsafe {
768                                TokioScope::scope_and_collect(move |scope| {
769                                    scope.spawn(executor_cx.run_test_instance(
770                                        stress_index,
771                                        test,
772                                        cx,
773                                        resp_tx.clone(),
774                                        setup_script_data,
775                                    ))
776                                })
777                            }
778                            .await;
779
780                            // If no future was started, that's really strange.
781                            // Worth at least logging.
782                            let Some(result) = ret.pop() else {
783                                warn!(
784                                    "no task was started for test instance: {}",
785                                    test_instance.id()
786                                );
787                                return None;
788                            };
789                            result.err()
790                        }
791                    };
792
793                    (threads_required, test_group, f)
794                })
795                // future_queue_grouped means tests are spawned in the order
796                // defined, but returned in any order.
797                .future_queue_grouped(self.test_threads, groups)
798                // Drop the None values.
799                .filter_map(std::future::ready)
800                .collect::<Vec<_>>()
801                // Interestingly, using a more idiomatic `async move {
802                // run_tests_fut.await ... }` block causes Rust 1.83 to complain
803                // about a weird lifetime mismatch. FutureExt::map as used below
804                // does not.
805                .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
806
807            scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
808        });
809
810        // Were there any join errors in tasks?
811        //
812        // If one of the tasks panics, we likely end up stuck because the
813        // dispatcher, which is spawned in the same async-scoped block, doesn't
814        // get relayed the panic immediately. That should probably be fixed at
815        // some point.
816        let mut cancelled_count = 0;
817        let join_errors = results
818            .into_iter()
819            .flat_map(|r| {
820                match r {
821                    Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
822                    // Largely ignore cancelled tasks since it most likely means
823                    // shutdown -- we don't cancel tasks manually.
824                    Ok(RunnerTaskState::Cancelled) => {
825                        cancelled_count += 1;
826                        Vec::new()
827                    }
828                    Err(join_error) => vec![join_error],
829                }
830            })
831            .collect::<Vec<_>>();
832
833        if cancelled_count > 0 {
834            debug!(
835                "{} tasks were cancelled -- this \
836                 generally should only happen due to panics",
837                cancelled_count
838            );
839        }
840        if !join_errors.is_empty() {
841            return Err(join_errors);
842        }
843
844        Ok(())
845    }
846}
847
848/// Configures stdout, stdin and stderr inheritance by test processes on Windows.
849///
850/// With Rust on Windows, these handles can be held open by tests (and therefore by grandchild processes)
851/// even if we run the tests with `Stdio::inherit`. This can cause problems with leaky tests.
852///
853/// This changes global state on the Win32 side, so the application must manage mutual exclusion
854/// around it. Call this right before [`TestRunner::try_execute`].
855///
856/// This is a no-op on non-Windows platforms.
857///
858/// See [this issue on the Rust repository](https://github.com/rust-lang/rust/issues/54760) for more
859/// discussion.
860pub fn configure_handle_inheritance(
861    no_capture: bool,
862) -> Result<(), ConfigureHandleInheritanceError> {
863    super::os::configure_handle_inheritance_impl(no_capture)
864}
865
866/// Environment variable to force a specific run ID (for testing).
867const FORCE_RUN_ID_ENV: &str = "__NEXTEST_FORCE_RUN_ID";
868
869/// Returns a forced run ID from the environment, or generates a new one.
870fn force_or_new_run_id() -> ReportUuid {
871    if let Ok(id_str) = std::env::var(FORCE_RUN_ID_ENV) {
872        match id_str.parse::<ReportUuid>() {
873            Ok(uuid) => return uuid,
874            Err(err) => {
875                warn!(
876                    "{FORCE_RUN_ID_ENV} is set but invalid (expected UUID): {err}, \
877                     generating random ID"
878                );
879            }
880        }
881    }
882    ReportUuid::new_v4()
883}
884
885#[cfg(test)]
886mod tests {
887    use super::*;
888    use crate::{config::core::NextestConfig, platform::BuildPlatforms};
889
890    #[test]
891    fn no_capture_settings() {
892        // Ensure that output settings are ignored with no-capture.
893        let mut builder = TestRunnerBuilder::default();
894        builder
895            .set_capture_strategy(CaptureStrategy::None)
896            .set_test_threads(TestThreads::Count(20));
897        let test_list = TestList::empty();
898        let config = NextestConfig::default_config("/fake/dir");
899        let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
900        let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
901        let signal_handler = SignalHandlerKind::Noop;
902        let input_handler = InputHandlerKind::Noop;
903        let profile = profile.apply_build_platforms(&build_platforms);
904        let runner = builder
905            .build(
906                &test_list,
907                &profile,
908                vec![],
909                signal_handler,
910                input_handler,
911                DoubleSpawnInfo::disabled(),
912                TargetRunner::empty(),
913            )
914            .unwrap();
915        assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
916        assert_eq!(runner.inner.test_threads, 1, "tests run serially");
917    }
918
919    #[test]
920    fn test_debugger_command_parsing() {
921        // Valid commands
922        let cmd = DebuggerCommand::from_str("gdb --args").unwrap();
923        assert_eq!(cmd.program(), "gdb");
924        assert_eq!(cmd.args(), &["--args"]);
925
926        let cmd = DebuggerCommand::from_str("rust-gdb -ex run --args").unwrap();
927        assert_eq!(cmd.program(), "rust-gdb");
928        assert_eq!(cmd.args(), &["-ex", "run", "--args"]);
929
930        // With quotes
931        let cmd = DebuggerCommand::from_str(r#"gdb -ex "set print pretty on" --args"#).unwrap();
932        assert_eq!(cmd.program(), "gdb");
933        assert_eq!(cmd.args(), &["-ex", "set print pretty on", "--args"]);
934
935        // Empty command
936        let err = DebuggerCommand::from_str("").unwrap_err();
937        assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
938
939        // Whitespace only
940        let err = DebuggerCommand::from_str("   ").unwrap_err();
941        assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
942    }
943}