nextest_runner/runner/
imp.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6    config::{
7        EvaluatableProfile, MaxFail, RetryPolicy, SetupScriptExecuteData, TestGroup, TestThreads,
8    },
9    double_spawn::DoubleSpawnInfo,
10    errors::{ConfigureHandleInheritanceError, TestRunnerBuildError, TestRunnerExecuteErrors},
11    input::{InputHandler, InputHandlerKind, InputHandlerStatus},
12    list::{TestInstanceWithSettings, TestList},
13    reporter::events::{RunStats, TestEvent},
14    runner::ExecutorEvent,
15    signal::{SignalHandler, SignalHandlerKind},
16    target_runner::TargetRunner,
17    test_output::CaptureStrategy,
18};
19use async_scoped::TokioScope;
20use future_queue::{FutureQueueContext, StreamExt};
21use futures::prelude::*;
22use nextest_metadata::FilterMatch;
23use quick_junit::ReportUuid;
24use std::{convert::Infallible, fmt, sync::Arc};
25use tokio::{
26    runtime::Runtime,
27    sync::{mpsc::unbounded_channel, oneshot},
28    task::JoinError,
29};
30use tracing::{debug, warn};
31
32/// Test runner options.
33#[derive(Debug, Default)]
34pub struct TestRunnerBuilder {
35    capture_strategy: CaptureStrategy,
36    retries: Option<RetryPolicy>,
37    max_fail: Option<MaxFail>,
38    test_threads: Option<TestThreads>,
39}
40
41impl TestRunnerBuilder {
42    /// Sets the capture strategy for the test runner
43    ///
44    /// * [`CaptureStrategy::Split`]
45    ///   * pro: output from `stdout` and `stderr` can be identified and easily split
46    ///   * con: ordering between the streams cannot be guaranteed
47    /// * [`CaptureStrategy::Combined`]
48    ///   * pro: output is guaranteed to be ordered as it would in a terminal emulator
49    ///   * con: distinction between `stdout` and `stderr` is lost
50    /// * [`CaptureStrategy::None`] -
51    ///   * In this mode, tests will always be run serially: `test_threads` will always be 1.
52    pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
53        self.capture_strategy = strategy;
54        self
55    }
56
57    /// Sets the number of retries for this test runner.
58    pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
59        self.retries = Some(retries);
60        self
61    }
62
63    /// Sets the max-fail value for this test runner.
64    pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
65        self.max_fail = Some(max_fail);
66        self
67    }
68
69    /// Sets the number of tests to run simultaneously.
70    pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
71        self.test_threads = Some(test_threads);
72        self
73    }
74
75    /// Creates a new test runner.
76    #[expect(clippy::too_many_arguments)]
77    pub fn build<'a>(
78        self,
79        test_list: &'a TestList,
80        profile: &'a EvaluatableProfile<'a>,
81        cli_args: Vec<String>,
82        signal_handler: SignalHandlerKind,
83        input_handler: InputHandlerKind,
84        double_spawn: DoubleSpawnInfo,
85        target_runner: TargetRunner,
86    ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
87        let test_threads = match self.capture_strategy {
88            CaptureStrategy::None => 1,
89            CaptureStrategy::Combined | CaptureStrategy::Split => self
90                .test_threads
91                .unwrap_or_else(|| profile.test_threads())
92                .compute(),
93        };
94        let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
95
96        let runtime = tokio::runtime::Builder::new_multi_thread()
97            .enable_all()
98            .thread_name("nextest-runner-worker")
99            .build()
100            .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
101        let _guard = runtime.enter();
102
103        // signal_handler.build() must be called from within the guard.
104        let signal_handler = signal_handler.build()?;
105
106        let input_handler = input_handler.build();
107
108        Ok(TestRunner {
109            inner: TestRunnerInner {
110                run_id: ReportUuid::new_v4(),
111                profile,
112                test_list,
113                test_threads,
114                double_spawn,
115                target_runner,
116                capture_strategy: self.capture_strategy,
117                force_retries: self.retries,
118                cli_args,
119                max_fail,
120                runtime,
121            },
122            signal_handler,
123            input_handler,
124        })
125    }
126}
127
128/// Context for running tests.
129///
130/// Created using [`TestRunnerBuilder::build`].
131#[derive(Debug)]
132pub struct TestRunner<'a> {
133    inner: TestRunnerInner<'a>,
134    signal_handler: SignalHandler,
135    input_handler: InputHandler,
136}
137
138impl<'a> TestRunner<'a> {
139    /// Returns the status of the input handler.
140    pub fn input_handler_status(&self) -> InputHandlerStatus {
141        self.input_handler.status()
142    }
143
144    /// Executes the listed tests, each one in its own process.
145    ///
146    /// The callback is called with the results of each test.
147    ///
148    /// Returns an error if any of the tasks panicked.
149    pub fn execute<F>(
150        self,
151        mut callback: F,
152    ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
153    where
154        F: FnMut(TestEvent<'a>) + Send,
155    {
156        self.try_execute::<Infallible, _>(|test_event| {
157            callback(test_event);
158            Ok(())
159        })
160    }
161
162    /// Executes the listed tests, each one in its own process.
163    ///
164    /// Accepts a callback that is called with the results of each test. If the callback returns an
165    /// error, the test run terminates and the callback is no longer called.
166    ///
167    /// Returns an error if any of the tasks panicked.
168    pub fn try_execute<E, F>(
169        mut self,
170        mut callback: F,
171    ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
172    where
173        F: FnMut(TestEvent<'a>) -> Result<(), E> + Send,
174        E: fmt::Debug + Send,
175    {
176        let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
177
178        // If report_cancel_tx is None, at least one error has occurred and the
179        // runner has been instructed to shut down. first_error is also set to
180        // Some in that case.
181        let mut report_cancel_tx = Some(report_cancel_tx);
182        let mut first_error = None;
183
184        let res = self.inner.execute(
185            &mut self.signal_handler,
186            &mut self.input_handler,
187            report_cancel_rx,
188            |event| {
189                match callback(event) {
190                    Ok(()) => {}
191                    Err(error) => {
192                        // If the callback fails, we need to let the runner know to start shutting
193                        // down. But we keep reporting results in case the callback starts working
194                        // again.
195                        if let Some(report_cancel_tx) = report_cancel_tx.take() {
196                            let _ = report_cancel_tx.send(());
197                            first_error = Some(error);
198                        }
199                    }
200                }
201            },
202        );
203
204        // On Windows, the stdout and stderr futures might spawn processes that keep the runner
205        // stuck indefinitely if it's dropped the normal way. Shut it down aggressively, being OK
206        // with leaked resources.
207        self.inner.runtime.shutdown_background();
208
209        match (res, first_error) {
210            (Ok(run_stats), None) => Ok(run_stats),
211            (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
212                report_error: Some(report_error),
213                join_errors: Vec::new(),
214            }),
215            (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
216                report_error,
217                join_errors,
218            }),
219        }
220    }
221}
222
223#[derive(Debug)]
224struct TestRunnerInner<'a> {
225    run_id: ReportUuid,
226    profile: &'a EvaluatableProfile<'a>,
227    test_list: &'a TestList<'a>,
228    test_threads: usize,
229    double_spawn: DoubleSpawnInfo,
230    target_runner: TargetRunner,
231    capture_strategy: CaptureStrategy,
232    force_retries: Option<RetryPolicy>,
233    cli_args: Vec<String>,
234    max_fail: MaxFail,
235    runtime: Runtime,
236}
237
238impl<'a> TestRunnerInner<'a> {
239    fn execute<F>(
240        &self,
241        signal_handler: &mut SignalHandler,
242        input_handler: &mut InputHandler,
243        report_cancel_rx: oneshot::Receiver<()>,
244        callback: F,
245    ) -> Result<RunStats, Vec<JoinError>>
246    where
247        F: FnMut(TestEvent<'a>) + Send,
248    {
249        // TODO: add support for other test-running approaches, measure performance.
250
251        let mut dispatcher_cx = DispatcherContext::new(
252            callback,
253            self.run_id,
254            self.profile.name(),
255            self.cli_args.clone(),
256            self.test_list.run_count(),
257            self.max_fail,
258        );
259
260        let executor_cx = ExecutorContext::new(
261            self.run_id,
262            self.profile,
263            self.test_list,
264            self.double_spawn.clone(),
265            self.target_runner.clone(),
266            self.capture_strategy,
267            self.force_retries,
268        );
269
270        // Send the initial event.
271        // (Don't need to set the cancelled atomic if this fails because the run hasn't started
272        // yet.)
273        dispatcher_cx.run_started(self.test_list);
274
275        let executor_cx_ref = &executor_cx;
276        let dispatcher_cx_mut = &mut dispatcher_cx;
277
278        let _guard = self.runtime.enter();
279
280        let ((), results) = TokioScope::scope_and_block(move |scope| {
281            let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
282
283            // Run the dispatcher to completion in a task.
284            let dispatcher_fut =
285                dispatcher_cx_mut.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
286            scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
287
288            let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
289            let script_resp_tx = resp_tx.clone();
290            let run_scripts_fut = async move {
291                // Since script tasks are run serially, we just reuse the one
292                // script task.
293                let script_data = executor_cx_ref.run_setup_scripts(script_resp_tx).await;
294                if script_tx.send(script_data).is_err() {
295                    // The dispatcher has shut down, so we should too.
296                    debug!("script_tx.send failed, shutting down");
297                }
298                RunnerTaskState::finished_no_children()
299            };
300            scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
301
302            let Some(script_data) = script_rx.blocking_recv() else {
303                // Most likely the harness is shutting down, so we should too.
304                debug!("no script data received, shutting down");
305                return;
306            };
307
308            // groups is going to be passed to future_queue_grouped.
309            let groups = self
310                .profile
311                .test_group_config()
312                .iter()
313                .map(|(group_name, config)| (group_name, config.max_threads.compute()));
314
315            let setup_script_data = Arc::new(script_data);
316
317            let filter_resp_tx = resp_tx.clone();
318
319            let tests = self.test_list.to_priority_queue(self.profile);
320            let run_tests_fut = futures::stream::iter(tests)
321                .filter_map(move |test| {
322                    // Filter tests before assigning a FutureQueueContext to
323                    // them.
324                    //
325                    // Note that this function is called lazily due to the
326                    // `future_queue_grouped` below. This means that skip
327                    // notifications will go out as tests are iterated over, not
328                    // all at once.
329                    let filter_resp_tx = filter_resp_tx.clone();
330                    async move {
331                        if let FilterMatch::Mismatch { reason } =
332                            test.instance.test_info.filter_match
333                        {
334                            // Failure to send means the receiver was dropped.
335                            let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
336                                test_instance: test.instance,
337                                reason,
338                            });
339                            return None;
340                        }
341                        Some(test)
342                    }
343                })
344                .map(move |test: TestInstanceWithSettings<'a>| {
345                    let threads_required =
346                        test.settings.threads_required().compute(self.test_threads);
347                    let test_group = match test.settings.test_group() {
348                        TestGroup::Global => None,
349                        TestGroup::Custom(name) => Some(name.clone()),
350                    };
351                    let resp_tx = resp_tx.clone();
352                    let setup_script_data = setup_script_data.clone();
353
354                    let test_instance = test.instance;
355
356                    let f = move |cx: FutureQueueContext| {
357                        debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
358                        // Use a separate Tokio task for each test. For repos
359                        // with lots of small tests, this has been observed to
360                        // be much faster than using a single task for all tests
361                        // (what we used to do). It also provides some degree of
362                        // per-test isolation.
363                        async move {
364                            // SAFETY: Within an outer scope_and_block (which we
365                            // have here), scope_and_collect is safe as long as
366                            // the returned future isn't forgotten. We're not
367                            // forgetting it below -- we're running it to
368                            // completion immediately.
369                            //
370                            // But recursive scoped calls really feel like
371                            // pushing against the limits of async-scoped. For
372                            // example, there's no way built into async-scoped
373                            // to propagate a cancellation signal from the outer
374                            // scope to the inner scope. (But there could be,
375                            // right? That seems solvable via channels. And we
376                            // could likely do our own channels here.)
377                            let ((), mut ret) = unsafe {
378                                TokioScope::scope_and_collect(move |scope| {
379                                    scope.spawn(executor_cx_ref.run_test_instance(
380                                        test,
381                                        cx,
382                                        resp_tx.clone(),
383                                        setup_script_data,
384                                    ))
385                                })
386                            }
387                            .await;
388
389                            // If no future was started, that's really strange.
390                            // Worth at least logging.
391                            let Some(result) = ret.pop() else {
392                                warn!(
393                                    "no task was started for test instance: {}",
394                                    test_instance.id()
395                                );
396                                return None;
397                            };
398                            result.err()
399                        }
400                    };
401
402                    (threads_required, test_group, f)
403                })
404                // future_queue_grouped means tests are spawned in the order
405                // defined, but returned in any order.
406                .future_queue_grouped(self.test_threads, groups)
407                // Drop the None values.
408                .filter_map(std::future::ready)
409                .collect::<Vec<_>>()
410                // Interestingly, using a more idiomatic `async move {
411                // run_tests_fut.await ... }` block causes Rust 1.83 to complain
412                // about a weird lifetime mismatch. FutureExt::map as used below
413                // does not.
414                .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
415
416            scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
417        });
418
419        dispatcher_cx.run_finished();
420
421        // Were there any join errors in tasks?
422        //
423        // If one of the tasks panics, we likely end up stuck because the
424        // dispatcher, which is spawned in the same async-scoped block, doesn't
425        // get relayed the panic immediately. That should probably be fixed at
426        // some point.
427        let mut cancelled_count = 0;
428        let join_errors = results
429            .into_iter()
430            .flat_map(|r| {
431                match r {
432                    Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
433                    // Largely ignore cancelled tasks since it most likely means
434                    // shutdown -- we don't cancel tasks manually.
435                    Ok(RunnerTaskState::Cancelled) => {
436                        cancelled_count += 1;
437                        Vec::new()
438                    }
439                    Err(join_error) => vec![join_error],
440                }
441            })
442            .collect::<Vec<_>>();
443
444        if cancelled_count > 0 {
445            debug!(
446                "{} tasks were cancelled -- this \
447                 generally should only happen due to panics",
448                cancelled_count
449            );
450        }
451        if !join_errors.is_empty() {
452            return Err(join_errors);
453        }
454        Ok(dispatcher_cx.run_stats())
455    }
456}
457
458/// Configures stdout, stdin and stderr inheritance by test processes on Windows.
459///
460/// With Rust on Windows, these handles can be held open by tests (and therefore by grandchild processes)
461/// even if we run the tests with `Stdio::inherit`. This can cause problems with leaky tests.
462///
463/// This changes global state on the Win32 side, so the application must manage mutual exclusion
464/// around it. Call this right before [`TestRunner::try_execute`].
465///
466/// This is a no-op on non-Windows platforms.
467///
468/// See [this issue on the Rust repository](https://github.com/rust-lang/rust/issues/54760) for more
469/// discussion.
470pub fn configure_handle_inheritance(
471    no_capture: bool,
472) -> Result<(), ConfigureHandleInheritanceError> {
473    super::os::configure_handle_inheritance_impl(no_capture)
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479    use crate::{config::NextestConfig, platform::BuildPlatforms};
480
481    #[test]
482    fn no_capture_settings() {
483        // Ensure that output settings are ignored with no-capture.
484        let mut builder = TestRunnerBuilder::default();
485        builder
486            .set_capture_strategy(CaptureStrategy::None)
487            .set_test_threads(TestThreads::Count(20));
488        let test_list = TestList::empty();
489        let config = NextestConfig::default_config("/fake/dir");
490        let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
491        let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
492        let signal_handler = SignalHandlerKind::Noop;
493        let input_handler = InputHandlerKind::Noop;
494        let profile = profile.apply_build_platforms(&build_platforms);
495        let runner = builder
496            .build(
497                &test_list,
498                &profile,
499                vec![],
500                signal_handler,
501                input_handler,
502                DoubleSpawnInfo::disabled(),
503                TargetRunner::empty(),
504            )
505            .unwrap();
506        assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
507        assert_eq!(runner.inner.test_threads, 1, "tests run serially");
508    }
509}