nextest_runner/runner/
imp.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6    config::{
7        core::EvaluatableProfile,
8        elements::{MaxFail, RetryPolicy, TestGroup, TestThreads},
9        scripts::SetupScriptExecuteData,
10    },
11    double_spawn::DoubleSpawnInfo,
12    errors::{
13        ConfigureHandleInheritanceError, StressCountParseError, TestRunnerBuildError,
14        TestRunnerExecuteErrors,
15    },
16    input::{InputHandler, InputHandlerKind, InputHandlerStatus},
17    list::{TestInstanceWithSettings, TestList},
18    reporter::events::{RunStats, StressIndex, TestEvent},
19    runner::ExecutorEvent,
20    signal::{SignalHandler, SignalHandlerKind},
21    target_runner::TargetRunner,
22    test_output::CaptureStrategy,
23};
24use async_scoped::TokioScope;
25use future_queue::{FutureQueueContext, StreamExt};
26use futures::{future::Fuse, prelude::*};
27use nextest_metadata::FilterMatch;
28use quick_junit::ReportUuid;
29use std::{
30    convert::Infallible, fmt, num::NonZero, pin::Pin, str::FromStr, sync::Arc, time::Duration,
31};
32use tokio::{
33    runtime::Runtime,
34    sync::{mpsc::unbounded_channel, oneshot},
35    task::JoinError,
36};
37use tracing::{debug, warn};
38
39/// Test runner options.
40#[derive(Debug, Default)]
41pub struct TestRunnerBuilder {
42    capture_strategy: CaptureStrategy,
43    retries: Option<RetryPolicy>,
44    max_fail: Option<MaxFail>,
45    test_threads: Option<TestThreads>,
46    stress_condition: Option<StressCondition>,
47}
48
49impl TestRunnerBuilder {
50    /// Sets the capture strategy for the test runner
51    ///
52    /// * [`CaptureStrategy::Split`]
53    ///   * pro: output from `stdout` and `stderr` can be identified and easily split
54    ///   * con: ordering between the streams cannot be guaranteed
55    /// * [`CaptureStrategy::Combined`]
56    ///   * pro: output is guaranteed to be ordered as it would in a terminal emulator
57    ///   * con: distinction between `stdout` and `stderr` is lost
58    /// * [`CaptureStrategy::None`] -
59    ///   * In this mode, tests will always be run serially: `test_threads` will always be 1.
60    pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
61        self.capture_strategy = strategy;
62        self
63    }
64
65    /// Sets the number of retries for this test runner.
66    pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
67        self.retries = Some(retries);
68        self
69    }
70
71    /// Sets the max-fail value for this test runner.
72    pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
73        self.max_fail = Some(max_fail);
74        self
75    }
76
77    /// Sets the number of tests to run simultaneously.
78    pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
79        self.test_threads = Some(test_threads);
80        self
81    }
82
83    /// Sets the stress testing condition.
84    pub fn set_stress_condition(&mut self, stress_condition: StressCondition) -> &mut Self {
85        self.stress_condition = Some(stress_condition);
86        self
87    }
88
89    /// Creates a new test runner.
90    #[expect(clippy::too_many_arguments)]
91    pub fn build<'a>(
92        self,
93        test_list: &'a TestList,
94        profile: &'a EvaluatableProfile<'a>,
95        cli_args: Vec<String>,
96        signal_handler: SignalHandlerKind,
97        input_handler: InputHandlerKind,
98        double_spawn: DoubleSpawnInfo,
99        target_runner: TargetRunner,
100    ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
101        let test_threads = match self.capture_strategy {
102            CaptureStrategy::None => 1,
103            CaptureStrategy::Combined | CaptureStrategy::Split => self
104                .test_threads
105                .unwrap_or_else(|| profile.test_threads())
106                .compute(),
107        };
108        let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
109
110        let runtime = tokio::runtime::Builder::new_multi_thread()
111            .enable_all()
112            .thread_name("nextest-runner-worker")
113            .build()
114            .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
115        let _guard = runtime.enter();
116
117        // signal_handler.build() must be called from within the guard.
118        let signal_handler = signal_handler.build()?;
119
120        let input_handler = input_handler.build();
121
122        Ok(TestRunner {
123            inner: TestRunnerInner {
124                run_id: ReportUuid::new_v4(),
125                profile,
126                test_list,
127                test_threads,
128                double_spawn,
129                target_runner,
130                capture_strategy: self.capture_strategy,
131                force_retries: self.retries,
132                cli_args,
133                max_fail,
134                stress_condition: self.stress_condition,
135                runtime,
136            },
137            signal_handler,
138            input_handler,
139        })
140    }
141}
142
143/// Stress testing condition.
144#[derive(Clone, Debug)]
145pub enum StressCondition {
146    /// Run each test `count` times.
147    Count(StressCount),
148
149    /// Run until this duration has elapsed.
150    Duration(Duration),
151}
152
153/// A count for stress testing.
154#[derive(Clone, Copy, Debug)]
155pub enum StressCount {
156    /// Run each test `count` times.
157    Count(NonZero<u32>),
158
159    /// Run indefinitely.
160    Infinite,
161}
162
163impl FromStr for StressCount {
164    type Err = StressCountParseError;
165
166    fn from_str(s: &str) -> Result<Self, Self::Err> {
167        if s == "infinite" {
168            Ok(StressCount::Infinite)
169        } else {
170            match s.parse() {
171                Ok(count) => Ok(StressCount::Count(count)),
172                Err(_) => Err(StressCountParseError::new(s)),
173            }
174        }
175    }
176}
177
178/// Context for running tests.
179///
180/// Created using [`TestRunnerBuilder::build`].
181#[derive(Debug)]
182pub struct TestRunner<'a> {
183    inner: TestRunnerInner<'a>,
184    signal_handler: SignalHandler,
185    input_handler: InputHandler,
186}
187
188impl<'a> TestRunner<'a> {
189    /// Returns the status of the input handler.
190    pub fn input_handler_status(&self) -> InputHandlerStatus {
191        self.input_handler.status()
192    }
193
194    /// Executes the listed tests, each one in its own process.
195    ///
196    /// The callback is called with the results of each test.
197    ///
198    /// Returns an error if any of the tasks panicked.
199    pub fn execute<F>(
200        self,
201        mut callback: F,
202    ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
203    where
204        F: FnMut(TestEvent<'a>) + Send,
205    {
206        self.try_execute::<Infallible, _>(|test_event| {
207            callback(test_event);
208            Ok(())
209        })
210    }
211
212    /// Executes the listed tests, each one in its own process.
213    ///
214    /// Accepts a callback that is called with the results of each test. If the callback returns an
215    /// error, the test run terminates and the callback is no longer called.
216    ///
217    /// Returns an error if any of the tasks panicked.
218    pub fn try_execute<E, F>(
219        mut self,
220        mut callback: F,
221    ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
222    where
223        F: FnMut(TestEvent<'a>) -> Result<(), E> + Send,
224        E: fmt::Debug + Send,
225    {
226        let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
227
228        // If report_cancel_tx is None, at least one error has occurred and the
229        // runner has been instructed to shut down. first_error is also set to
230        // Some in that case.
231        let mut report_cancel_tx = Some(report_cancel_tx);
232        let mut first_error = None;
233
234        let res = self.inner.execute(
235            &mut self.signal_handler,
236            &mut self.input_handler,
237            report_cancel_rx,
238            |event| {
239                match callback(event) {
240                    Ok(()) => {}
241                    Err(error) => {
242                        // If the callback fails, we need to let the runner know to start shutting
243                        // down. But we keep reporting results in case the callback starts working
244                        // again.
245                        if let Some(report_cancel_tx) = report_cancel_tx.take() {
246                            let _ = report_cancel_tx.send(());
247                            first_error = Some(error);
248                        }
249                    }
250                }
251            },
252        );
253
254        // On Windows, the stdout and stderr futures might spawn processes that keep the runner
255        // stuck indefinitely if it's dropped the normal way. Shut it down aggressively, being OK
256        // with leaked resources.
257        self.inner.runtime.shutdown_background();
258
259        match (res, first_error) {
260            (Ok(run_stats), None) => Ok(run_stats),
261            (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
262                report_error: Some(report_error),
263                join_errors: Vec::new(),
264            }),
265            (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
266                report_error,
267                join_errors,
268            }),
269        }
270    }
271}
272
273#[derive(Debug)]
274struct TestRunnerInner<'a> {
275    run_id: ReportUuid,
276    profile: &'a EvaluatableProfile<'a>,
277    test_list: &'a TestList<'a>,
278    test_threads: usize,
279    double_spawn: DoubleSpawnInfo,
280    target_runner: TargetRunner,
281    capture_strategy: CaptureStrategy,
282    force_retries: Option<RetryPolicy>,
283    cli_args: Vec<String>,
284    max_fail: MaxFail,
285    stress_condition: Option<StressCondition>,
286    runtime: Runtime,
287}
288
289impl<'a> TestRunnerInner<'a> {
290    fn execute<F>(
291        &self,
292        signal_handler: &mut SignalHandler,
293        input_handler: &mut InputHandler,
294        report_cancel_rx: oneshot::Receiver<()>,
295        callback: F,
296    ) -> Result<RunStats, Vec<JoinError>>
297    where
298        F: FnMut(TestEvent<'a>) + Send,
299    {
300        // TODO: add support for other test-running approaches, measure performance.
301
302        let mut dispatcher_cx = DispatcherContext::new(
303            callback,
304            self.run_id,
305            self.profile.name(),
306            self.cli_args.clone(),
307            self.test_list.run_count(),
308            self.max_fail,
309            self.profile.global_timeout().period,
310            self.stress_condition.clone(),
311        );
312
313        let executor_cx = ExecutorContext::new(
314            self.run_id,
315            self.profile,
316            self.test_list,
317            self.double_spawn.clone(),
318            self.target_runner.clone(),
319            self.capture_strategy,
320            self.force_retries,
321        );
322
323        // Send the initial event.
324        dispatcher_cx.run_started(self.test_list);
325
326        let _guard = self.runtime.enter();
327
328        let mut report_cancel_rx = std::pin::pin!(report_cancel_rx.fuse());
329
330        if self.stress_condition.is_some() {
331            loop {
332                let progress = dispatcher_cx
333                    .stress_progress()
334                    .expect("stress_condition is Some => stress progress is Some");
335                if progress.remaining().is_some() {
336                    dispatcher_cx.stress_sub_run_started(progress);
337
338                    self.do_run(
339                        dispatcher_cx.stress_index(),
340                        &mut dispatcher_cx,
341                        &executor_cx,
342                        signal_handler,
343                        input_handler,
344                        report_cancel_rx.as_mut(),
345                    )?;
346
347                    dispatcher_cx.stress_sub_run_finished();
348
349                    if dispatcher_cx.cancel_reason().is_some() {
350                        break;
351                    }
352                } else {
353                    break;
354                }
355            }
356        } else {
357            self.do_run(
358                None,
359                &mut dispatcher_cx,
360                &executor_cx,
361                signal_handler,
362                input_handler,
363                report_cancel_rx,
364            )?;
365        }
366
367        dispatcher_cx.run_finished();
368
369        Ok(dispatcher_cx.run_stats())
370    }
371
372    fn do_run<F>(
373        &self,
374        stress_index: Option<StressIndex>,
375        dispatcher_cx: &mut DispatcherContext<'a, F>,
376        executor_cx: &ExecutorContext<'a>,
377        signal_handler: &mut SignalHandler,
378        input_handler: &mut InputHandler,
379        report_cancel_rx: Pin<&mut Fuse<oneshot::Receiver<()>>>,
380    ) -> Result<(), Vec<JoinError>>
381    where
382        F: FnMut(TestEvent<'a>) + Send,
383    {
384        let ((), results) = TokioScope::scope_and_block(move |scope| {
385            let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
386
387            // Run the dispatcher to completion in a task.
388            let dispatcher_fut =
389                dispatcher_cx.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
390            scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
391
392            let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
393            let script_resp_tx = resp_tx.clone();
394            let run_scripts_fut = async move {
395                // Since script tasks are run serially, we just reuse the one
396                // script task.
397                let script_data = executor_cx
398                    .run_setup_scripts(stress_index, script_resp_tx)
399                    .await;
400                if script_tx.send(script_data).is_err() {
401                    // The dispatcher has shut down, so we should too.
402                    debug!("script_tx.send failed, shutting down");
403                }
404                RunnerTaskState::finished_no_children()
405            };
406            scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
407
408            let Some(script_data) = script_rx.blocking_recv() else {
409                // Most likely the harness is shutting down, so we should too.
410                debug!("no script data received, shutting down");
411                return;
412            };
413
414            // groups is going to be passed to future_queue_grouped.
415            let groups = self
416                .profile
417                .test_group_config()
418                .iter()
419                .map(|(group_name, config)| (group_name, config.max_threads.compute()));
420
421            let setup_script_data = Arc::new(script_data);
422
423            let filter_resp_tx = resp_tx.clone();
424
425            let tests = self.test_list.to_priority_queue(self.profile);
426            let run_tests_fut = futures::stream::iter(tests)
427                .filter_map(move |test| {
428                    // Filter tests before assigning a FutureQueueContext to
429                    // them.
430                    //
431                    // Note that this function is called lazily due to the
432                    // `future_queue_grouped` below. This means that skip
433                    // notifications will go out as tests are iterated over, not
434                    // all at once.
435                    let filter_resp_tx = filter_resp_tx.clone();
436                    async move {
437                        if let FilterMatch::Mismatch { reason } =
438                            test.instance.test_info.filter_match
439                        {
440                            // Failure to send means the receiver was dropped.
441                            let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
442                                stress_index,
443                                test_instance: test.instance,
444                                reason,
445                            });
446                            return None;
447                        }
448                        Some(test)
449                    }
450                })
451                .map(move |test: TestInstanceWithSettings<'a>| {
452                    let threads_required =
453                        test.settings.threads_required().compute(self.test_threads);
454                    let test_group = match test.settings.test_group() {
455                        TestGroup::Global => None,
456                        TestGroup::Custom(name) => Some(name.clone()),
457                    };
458                    let resp_tx = resp_tx.clone();
459                    let setup_script_data = setup_script_data.clone();
460
461                    let test_instance = test.instance;
462
463                    let f = move |cx: FutureQueueContext| {
464                        debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
465                        // Use a separate Tokio task for each test. For repos
466                        // with lots of small tests, this has been observed to
467                        // be much faster than using a single task for all tests
468                        // (what we used to do). It also provides some degree of
469                        // per-test isolation.
470                        async move {
471                            // SAFETY: Within an outer scope_and_block (which we
472                            // have here), scope_and_collect is safe as long as
473                            // the returned future isn't forgotten. We're not
474                            // forgetting it below -- we're running it to
475                            // completion immediately.
476                            //
477                            // But recursive scoped calls really feel like
478                            // pushing against the limits of async-scoped. For
479                            // example, there's no way built into async-scoped
480                            // to propagate a cancellation signal from the outer
481                            // scope to the inner scope. (But there could be,
482                            // right? That seems solvable via channels. And we
483                            // could likely do our own channels here.)
484                            let ((), mut ret) = unsafe {
485                                TokioScope::scope_and_collect(move |scope| {
486                                    scope.spawn(executor_cx.run_test_instance(
487                                        stress_index,
488                                        test,
489                                        cx,
490                                        resp_tx.clone(),
491                                        setup_script_data,
492                                    ))
493                                })
494                            }
495                            .await;
496
497                            // If no future was started, that's really strange.
498                            // Worth at least logging.
499                            let Some(result) = ret.pop() else {
500                                warn!(
501                                    "no task was started for test instance: {}",
502                                    test_instance.id()
503                                );
504                                return None;
505                            };
506                            result.err()
507                        }
508                    };
509
510                    (threads_required, test_group, f)
511                })
512                // future_queue_grouped means tests are spawned in the order
513                // defined, but returned in any order.
514                .future_queue_grouped(self.test_threads, groups)
515                // Drop the None values.
516                .filter_map(std::future::ready)
517                .collect::<Vec<_>>()
518                // Interestingly, using a more idiomatic `async move {
519                // run_tests_fut.await ... }` block causes Rust 1.83 to complain
520                // about a weird lifetime mismatch. FutureExt::map as used below
521                // does not.
522                .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
523
524            scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
525        });
526
527        // Were there any join errors in tasks?
528        //
529        // If one of the tasks panics, we likely end up stuck because the
530        // dispatcher, which is spawned in the same async-scoped block, doesn't
531        // get relayed the panic immediately. That should probably be fixed at
532        // some point.
533        let mut cancelled_count = 0;
534        let join_errors = results
535            .into_iter()
536            .flat_map(|r| {
537                match r {
538                    Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
539                    // Largely ignore cancelled tasks since it most likely means
540                    // shutdown -- we don't cancel tasks manually.
541                    Ok(RunnerTaskState::Cancelled) => {
542                        cancelled_count += 1;
543                        Vec::new()
544                    }
545                    Err(join_error) => vec![join_error],
546                }
547            })
548            .collect::<Vec<_>>();
549
550        if cancelled_count > 0 {
551            debug!(
552                "{} tasks were cancelled -- this \
553                 generally should only happen due to panics",
554                cancelled_count
555            );
556        }
557        if !join_errors.is_empty() {
558            return Err(join_errors);
559        }
560
561        Ok(())
562    }
563}
564
565/// Configures stdout, stdin and stderr inheritance by test processes on Windows.
566///
567/// With Rust on Windows, these handles can be held open by tests (and therefore by grandchild processes)
568/// even if we run the tests with `Stdio::inherit`. This can cause problems with leaky tests.
569///
570/// This changes global state on the Win32 side, so the application must manage mutual exclusion
571/// around it. Call this right before [`TestRunner::try_execute`].
572///
573/// This is a no-op on non-Windows platforms.
574///
575/// See [this issue on the Rust repository](https://github.com/rust-lang/rust/issues/54760) for more
576/// discussion.
577pub fn configure_handle_inheritance(
578    no_capture: bool,
579) -> Result<(), ConfigureHandleInheritanceError> {
580    super::os::configure_handle_inheritance_impl(no_capture)
581}
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586    use crate::{config::core::NextestConfig, platform::BuildPlatforms};
587
588    #[test]
589    fn no_capture_settings() {
590        // Ensure that output settings are ignored with no-capture.
591        let mut builder = TestRunnerBuilder::default();
592        builder
593            .set_capture_strategy(CaptureStrategy::None)
594            .set_test_threads(TestThreads::Count(20));
595        let test_list = TestList::empty();
596        let config = NextestConfig::default_config("/fake/dir");
597        let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
598        let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
599        let signal_handler = SignalHandlerKind::Noop;
600        let input_handler = InputHandlerKind::Noop;
601        let profile = profile.apply_build_platforms(&build_platforms);
602        let runner = builder
603            .build(
604                &test_list,
605                &profile,
606                vec![],
607                signal_handler,
608                input_handler,
609                DoubleSpawnInfo::disabled(),
610                TargetRunner::empty(),
611            )
612            .unwrap();
613        assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
614        assert_eq!(runner.inner.test_threads, 1, "tests run serially");
615    }
616}