nextest_runner/runner/
imp.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6    config::{
7        EvaluatableProfile, MaxFail, RetryPolicy, SetupScriptExecuteData, TestGroup, TestThreads,
8    },
9    double_spawn::DoubleSpawnInfo,
10    errors::{ConfigureHandleInheritanceError, TestRunnerBuildError, TestRunnerExecuteErrors},
11    input::{InputHandler, InputHandlerKind, InputHandlerStatus},
12    list::{TestInstanceWithSettings, TestList},
13    reporter::events::{RunStats, TestEvent},
14    runner::ExecutorEvent,
15    signal::{SignalHandler, SignalHandlerKind},
16    target_runner::TargetRunner,
17    test_output::CaptureStrategy,
18};
19use async_scoped::TokioScope;
20use future_queue::{FutureQueueContext, StreamExt};
21use futures::prelude::*;
22use nextest_metadata::FilterMatch;
23use quick_junit::ReportUuid;
24use std::{convert::Infallible, fmt, sync::Arc};
25use tokio::{
26    runtime::Runtime,
27    sync::{mpsc::unbounded_channel, oneshot},
28    task::JoinError,
29};
30use tracing::{debug, warn};
31
32/// Test runner options.
33#[derive(Debug, Default)]
34pub struct TestRunnerBuilder {
35    capture_strategy: CaptureStrategy,
36    retries: Option<RetryPolicy>,
37    max_fail: Option<MaxFail>,
38    test_threads: Option<TestThreads>,
39}
40
41impl TestRunnerBuilder {
42    /// Sets the capture strategy for the test runner
43    ///
44    /// * [`CaptureStrategy::Split`]
45    ///   * pro: output from `stdout` and `stderr` can be identified and easily split
46    ///   * con: ordering between the streams cannot be guaranteed
47    /// * [`CaptureStrategy::Combined`]
48    ///   * pro: output is guaranteed to be ordered as it would in a terminal emulator
49    ///   * con: distinction between `stdout` and `stderr` is lost
50    /// * [`CaptureStrategy::None`] -
51    ///   * In this mode, tests will always be run serially: `test_threads` will always be 1.
52    pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
53        self.capture_strategy = strategy;
54        self
55    }
56
57    /// Sets the number of retries for this test runner.
58    pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
59        self.retries = Some(retries);
60        self
61    }
62
63    /// Sets the max-fail value for this test runner.
64    pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
65        self.max_fail = Some(max_fail);
66        self
67    }
68
69    /// Sets the number of tests to run simultaneously.
70    pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
71        self.test_threads = Some(test_threads);
72        self
73    }
74
75    /// Creates a new test runner.
76    #[expect(clippy::too_many_arguments)]
77    pub fn build<'a>(
78        self,
79        test_list: &'a TestList,
80        profile: &'a EvaluatableProfile<'a>,
81        cli_args: Vec<String>,
82        signal_handler: SignalHandlerKind,
83        input_handler: InputHandlerKind,
84        double_spawn: DoubleSpawnInfo,
85        target_runner: TargetRunner,
86    ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
87        let test_threads = match self.capture_strategy {
88            CaptureStrategy::None => 1,
89            CaptureStrategy::Combined | CaptureStrategy::Split => self
90                .test_threads
91                .unwrap_or_else(|| profile.test_threads())
92                .compute(),
93        };
94        let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
95
96        let runtime = tokio::runtime::Builder::new_multi_thread()
97            .enable_all()
98            .thread_name("nextest-runner-worker")
99            .build()
100            .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
101        let _guard = runtime.enter();
102
103        // signal_handler.build() must be called from within the guard.
104        let signal_handler = signal_handler.build()?;
105
106        let input_handler = input_handler.build();
107
108        Ok(TestRunner {
109            inner: TestRunnerInner {
110                run_id: ReportUuid::new_v4(),
111                profile,
112                test_list,
113                test_threads,
114                double_spawn,
115                target_runner,
116                capture_strategy: self.capture_strategy,
117                force_retries: self.retries,
118                cli_args,
119                max_fail,
120                runtime,
121            },
122            signal_handler,
123            input_handler,
124        })
125    }
126}
127
128/// Context for running tests.
129///
130/// Created using [`TestRunnerBuilder::build`].
131#[derive(Debug)]
132pub struct TestRunner<'a> {
133    inner: TestRunnerInner<'a>,
134    signal_handler: SignalHandler,
135    input_handler: InputHandler,
136}
137
138impl<'a> TestRunner<'a> {
139    /// Returns the status of the input handler.
140    pub fn input_handler_status(&self) -> InputHandlerStatus {
141        self.input_handler.status()
142    }
143
144    /// Executes the listed tests, each one in its own process.
145    ///
146    /// The callback is called with the results of each test.
147    ///
148    /// Returns an error if any of the tasks panicked.
149    pub fn execute<F>(
150        self,
151        mut callback: F,
152    ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
153    where
154        F: FnMut(TestEvent<'a>) + Send,
155    {
156        self.try_execute::<Infallible, _>(|test_event| {
157            callback(test_event);
158            Ok(())
159        })
160    }
161
162    /// Executes the listed tests, each one in its own process.
163    ///
164    /// Accepts a callback that is called with the results of each test. If the callback returns an
165    /// error, the test run terminates and the callback is no longer called.
166    ///
167    /// Returns an error if any of the tasks panicked.
168    pub fn try_execute<E, F>(
169        mut self,
170        mut callback: F,
171    ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
172    where
173        F: FnMut(TestEvent<'a>) -> Result<(), E> + Send,
174        E: fmt::Debug + Send,
175    {
176        let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
177
178        // If report_cancel_tx is None, at least one error has occurred and the
179        // runner has been instructed to shut down. first_error is also set to
180        // Some in that case.
181        let mut report_cancel_tx = Some(report_cancel_tx);
182        let mut first_error = None;
183
184        let res = self.inner.execute(
185            &mut self.signal_handler,
186            &mut self.input_handler,
187            report_cancel_rx,
188            |event| {
189                match callback(event) {
190                    Ok(()) => {}
191                    Err(error) => {
192                        // If the callback fails, we need to let the runner know to start shutting
193                        // down. But we keep reporting results in case the callback starts working
194                        // again.
195                        if let Some(report_cancel_tx) = report_cancel_tx.take() {
196                            let _ = report_cancel_tx.send(());
197                            first_error = Some(error);
198                        }
199                    }
200                }
201            },
202        );
203
204        // On Windows, the stdout and stderr futures might spawn processes that keep the runner
205        // stuck indefinitely if it's dropped the normal way. Shut it down aggressively, being OK
206        // with leaked resources.
207        self.inner.runtime.shutdown_background();
208
209        match (res, first_error) {
210            (Ok(run_stats), None) => Ok(run_stats),
211            (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
212                report_error: Some(report_error),
213                join_errors: Vec::new(),
214            }),
215            (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
216                report_error,
217                join_errors,
218            }),
219        }
220    }
221}
222
223#[derive(Debug)]
224struct TestRunnerInner<'a> {
225    run_id: ReportUuid,
226    profile: &'a EvaluatableProfile<'a>,
227    test_list: &'a TestList<'a>,
228    test_threads: usize,
229    double_spawn: DoubleSpawnInfo,
230    target_runner: TargetRunner,
231    capture_strategy: CaptureStrategy,
232    force_retries: Option<RetryPolicy>,
233    cli_args: Vec<String>,
234    max_fail: MaxFail,
235    runtime: Runtime,
236}
237
238impl<'a> TestRunnerInner<'a> {
239    fn execute<F>(
240        &self,
241        signal_handler: &mut SignalHandler,
242        input_handler: &mut InputHandler,
243        report_cancel_rx: oneshot::Receiver<()>,
244        callback: F,
245    ) -> Result<RunStats, Vec<JoinError>>
246    where
247        F: FnMut(TestEvent<'a>) + Send,
248    {
249        // TODO: add support for other test-running approaches, measure performance.
250
251        let mut dispatcher_cx = DispatcherContext::new(
252            callback,
253            self.run_id,
254            self.profile.name(),
255            self.cli_args.clone(),
256            self.test_list.run_count(),
257            self.max_fail,
258            self.profile.global_timeout().period,
259        );
260
261        let executor_cx = ExecutorContext::new(
262            self.run_id,
263            self.profile,
264            self.test_list,
265            self.double_spawn.clone(),
266            self.target_runner.clone(),
267            self.capture_strategy,
268            self.force_retries,
269        );
270
271        // Send the initial event.
272        // (Don't need to set the cancelled atomic if this fails because the run hasn't started
273        // yet.)
274        dispatcher_cx.run_started(self.test_list);
275
276        let executor_cx_ref = &executor_cx;
277        let dispatcher_cx_mut = &mut dispatcher_cx;
278
279        let _guard = self.runtime.enter();
280
281        let ((), results) = TokioScope::scope_and_block(move |scope| {
282            let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
283
284            // Run the dispatcher to completion in a task.
285            let dispatcher_fut =
286                dispatcher_cx_mut.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
287            scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
288
289            let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
290            let script_resp_tx = resp_tx.clone();
291            let run_scripts_fut = async move {
292                // Since script tasks are run serially, we just reuse the one
293                // script task.
294                let script_data = executor_cx_ref.run_setup_scripts(script_resp_tx).await;
295                if script_tx.send(script_data).is_err() {
296                    // The dispatcher has shut down, so we should too.
297                    debug!("script_tx.send failed, shutting down");
298                }
299                RunnerTaskState::finished_no_children()
300            };
301            scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
302
303            let Some(script_data) = script_rx.blocking_recv() else {
304                // Most likely the harness is shutting down, so we should too.
305                debug!("no script data received, shutting down");
306                return;
307            };
308
309            // groups is going to be passed to future_queue_grouped.
310            let groups = self
311                .profile
312                .test_group_config()
313                .iter()
314                .map(|(group_name, config)| (group_name, config.max_threads.compute()));
315
316            let setup_script_data = Arc::new(script_data);
317
318            let filter_resp_tx = resp_tx.clone();
319
320            let tests = self.test_list.to_priority_queue(self.profile);
321            let run_tests_fut = futures::stream::iter(tests)
322                .filter_map(move |test| {
323                    // Filter tests before assigning a FutureQueueContext to
324                    // them.
325                    //
326                    // Note that this function is called lazily due to the
327                    // `future_queue_grouped` below. This means that skip
328                    // notifications will go out as tests are iterated over, not
329                    // all at once.
330                    let filter_resp_tx = filter_resp_tx.clone();
331                    async move {
332                        if let FilterMatch::Mismatch { reason } =
333                            test.instance.test_info.filter_match
334                        {
335                            // Failure to send means the receiver was dropped.
336                            let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
337                                test_instance: test.instance,
338                                reason,
339                            });
340                            return None;
341                        }
342                        Some(test)
343                    }
344                })
345                .map(move |test: TestInstanceWithSettings<'a>| {
346                    let threads_required =
347                        test.settings.threads_required().compute(self.test_threads);
348                    let test_group = match test.settings.test_group() {
349                        TestGroup::Global => None,
350                        TestGroup::Custom(name) => Some(name.clone()),
351                    };
352                    let resp_tx = resp_tx.clone();
353                    let setup_script_data = setup_script_data.clone();
354
355                    let test_instance = test.instance;
356
357                    let f = move |cx: FutureQueueContext| {
358                        debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
359                        // Use a separate Tokio task for each test. For repos
360                        // with lots of small tests, this has been observed to
361                        // be much faster than using a single task for all tests
362                        // (what we used to do). It also provides some degree of
363                        // per-test isolation.
364                        async move {
365                            // SAFETY: Within an outer scope_and_block (which we
366                            // have here), scope_and_collect is safe as long as
367                            // the returned future isn't forgotten. We're not
368                            // forgetting it below -- we're running it to
369                            // completion immediately.
370                            //
371                            // But recursive scoped calls really feel like
372                            // pushing against the limits of async-scoped. For
373                            // example, there's no way built into async-scoped
374                            // to propagate a cancellation signal from the outer
375                            // scope to the inner scope. (But there could be,
376                            // right? That seems solvable via channels. And we
377                            // could likely do our own channels here.)
378                            let ((), mut ret) = unsafe {
379                                TokioScope::scope_and_collect(move |scope| {
380                                    scope.spawn(executor_cx_ref.run_test_instance(
381                                        test,
382                                        cx,
383                                        resp_tx.clone(),
384                                        setup_script_data,
385                                    ))
386                                })
387                            }
388                            .await;
389
390                            // If no future was started, that's really strange.
391                            // Worth at least logging.
392                            let Some(result) = ret.pop() else {
393                                warn!(
394                                    "no task was started for test instance: {}",
395                                    test_instance.id()
396                                );
397                                return None;
398                            };
399                            result.err()
400                        }
401                    };
402
403                    (threads_required, test_group, f)
404                })
405                // future_queue_grouped means tests are spawned in the order
406                // defined, but returned in any order.
407                .future_queue_grouped(self.test_threads, groups)
408                // Drop the None values.
409                .filter_map(std::future::ready)
410                .collect::<Vec<_>>()
411                // Interestingly, using a more idiomatic `async move {
412                // run_tests_fut.await ... }` block causes Rust 1.83 to complain
413                // about a weird lifetime mismatch. FutureExt::map as used below
414                // does not.
415                .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
416
417            scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
418        });
419
420        dispatcher_cx.run_finished();
421
422        // Were there any join errors in tasks?
423        //
424        // If one of the tasks panics, we likely end up stuck because the
425        // dispatcher, which is spawned in the same async-scoped block, doesn't
426        // get relayed the panic immediately. That should probably be fixed at
427        // some point.
428        let mut cancelled_count = 0;
429        let join_errors = results
430            .into_iter()
431            .flat_map(|r| {
432                match r {
433                    Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
434                    // Largely ignore cancelled tasks since it most likely means
435                    // shutdown -- we don't cancel tasks manually.
436                    Ok(RunnerTaskState::Cancelled) => {
437                        cancelled_count += 1;
438                        Vec::new()
439                    }
440                    Err(join_error) => vec![join_error],
441                }
442            })
443            .collect::<Vec<_>>();
444
445        if cancelled_count > 0 {
446            debug!(
447                "{} tasks were cancelled -- this \
448                 generally should only happen due to panics",
449                cancelled_count
450            );
451        }
452        if !join_errors.is_empty() {
453            return Err(join_errors);
454        }
455        Ok(dispatcher_cx.run_stats())
456    }
457}
458
459/// Configures stdout, stdin and stderr inheritance by test processes on Windows.
460///
461/// With Rust on Windows, these handles can be held open by tests (and therefore by grandchild processes)
462/// even if we run the tests with `Stdio::inherit`. This can cause problems with leaky tests.
463///
464/// This changes global state on the Win32 side, so the application must manage mutual exclusion
465/// around it. Call this right before [`TestRunner::try_execute`].
466///
467/// This is a no-op on non-Windows platforms.
468///
469/// See [this issue on the Rust repository](https://github.com/rust-lang/rust/issues/54760) for more
470/// discussion.
471pub fn configure_handle_inheritance(
472    no_capture: bool,
473) -> Result<(), ConfigureHandleInheritanceError> {
474    super::os::configure_handle_inheritance_impl(no_capture)
475}
476
477#[cfg(test)]
478mod tests {
479    use super::*;
480    use crate::{config::NextestConfig, platform::BuildPlatforms};
481
482    #[test]
483    fn no_capture_settings() {
484        // Ensure that output settings are ignored with no-capture.
485        let mut builder = TestRunnerBuilder::default();
486        builder
487            .set_capture_strategy(CaptureStrategy::None)
488            .set_test_threads(TestThreads::Count(20));
489        let test_list = TestList::empty();
490        let config = NextestConfig::default_config("/fake/dir");
491        let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
492        let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
493        let signal_handler = SignalHandlerKind::Noop;
494        let input_handler = InputHandlerKind::Noop;
495        let profile = profile.apply_build_platforms(&build_platforms);
496        let runner = builder
497            .build(
498                &test_list,
499                &profile,
500                vec![],
501                signal_handler,
502                input_handler,
503                DoubleSpawnInfo::disabled(),
504                TargetRunner::empty(),
505            )
506            .unwrap();
507        assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
508        assert_eq!(runner.inner.test_threads, 1, "tests run serially");
509    }
510}