1use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6 config::{
7 EvaluatableProfile, MaxFail, RetryPolicy, SetupScriptExecuteData, TestGroup, TestThreads,
8 },
9 double_spawn::DoubleSpawnInfo,
10 errors::{ConfigureHandleInheritanceError, TestRunnerBuildError, TestRunnerExecuteErrors},
11 input::{InputHandler, InputHandlerKind, InputHandlerStatus},
12 list::{TestInstanceWithSettings, TestList},
13 reporter::events::{RunStats, TestEvent},
14 runner::ExecutorEvent,
15 signal::{SignalHandler, SignalHandlerKind},
16 target_runner::TargetRunner,
17 test_output::CaptureStrategy,
18};
19use async_scoped::TokioScope;
20use future_queue::{FutureQueueContext, StreamExt};
21use futures::prelude::*;
22use nextest_metadata::FilterMatch;
23use quick_junit::ReportUuid;
24use std::{convert::Infallible, fmt, sync::Arc};
25use tokio::{
26 runtime::Runtime,
27 sync::{mpsc::unbounded_channel, oneshot},
28 task::JoinError,
29};
30use tracing::{debug, warn};
31
32#[derive(Debug, Default)]
34pub struct TestRunnerBuilder {
35 capture_strategy: CaptureStrategy,
36 retries: Option<RetryPolicy>,
37 max_fail: Option<MaxFail>,
38 test_threads: Option<TestThreads>,
39}
40
41impl TestRunnerBuilder {
42 pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
53 self.capture_strategy = strategy;
54 self
55 }
56
57 pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
59 self.retries = Some(retries);
60 self
61 }
62
63 pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
65 self.max_fail = Some(max_fail);
66 self
67 }
68
69 pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
71 self.test_threads = Some(test_threads);
72 self
73 }
74
75 #[expect(clippy::too_many_arguments)]
77 pub fn build<'a>(
78 self,
79 test_list: &'a TestList,
80 profile: &'a EvaluatableProfile<'a>,
81 cli_args: Vec<String>,
82 signal_handler: SignalHandlerKind,
83 input_handler: InputHandlerKind,
84 double_spawn: DoubleSpawnInfo,
85 target_runner: TargetRunner,
86 ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
87 let test_threads = match self.capture_strategy {
88 CaptureStrategy::None => 1,
89 CaptureStrategy::Combined | CaptureStrategy::Split => self
90 .test_threads
91 .unwrap_or_else(|| profile.test_threads())
92 .compute(),
93 };
94 let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
95
96 let runtime = tokio::runtime::Builder::new_multi_thread()
97 .enable_all()
98 .thread_name("nextest-runner-worker")
99 .build()
100 .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
101 let _guard = runtime.enter();
102
103 let signal_handler = signal_handler.build()?;
105
106 let input_handler = input_handler.build();
107
108 Ok(TestRunner {
109 inner: TestRunnerInner {
110 run_id: ReportUuid::new_v4(),
111 profile,
112 test_list,
113 test_threads,
114 double_spawn,
115 target_runner,
116 capture_strategy: self.capture_strategy,
117 force_retries: self.retries,
118 cli_args,
119 max_fail,
120 runtime,
121 },
122 signal_handler,
123 input_handler,
124 })
125 }
126}
127
128#[derive(Debug)]
132pub struct TestRunner<'a> {
133 inner: TestRunnerInner<'a>,
134 signal_handler: SignalHandler,
135 input_handler: InputHandler,
136}
137
138impl<'a> TestRunner<'a> {
139 pub fn input_handler_status(&self) -> InputHandlerStatus {
141 self.input_handler.status()
142 }
143
144 pub fn execute<F>(
150 self,
151 mut callback: F,
152 ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
153 where
154 F: FnMut(TestEvent<'a>) + Send,
155 {
156 self.try_execute::<Infallible, _>(|test_event| {
157 callback(test_event);
158 Ok(())
159 })
160 }
161
162 pub fn try_execute<E, F>(
169 mut self,
170 mut callback: F,
171 ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
172 where
173 F: FnMut(TestEvent<'a>) -> Result<(), E> + Send,
174 E: fmt::Debug + Send,
175 {
176 let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
177
178 let mut report_cancel_tx = Some(report_cancel_tx);
182 let mut first_error = None;
183
184 let res = self.inner.execute(
185 &mut self.signal_handler,
186 &mut self.input_handler,
187 report_cancel_rx,
188 |event| {
189 match callback(event) {
190 Ok(()) => {}
191 Err(error) => {
192 if let Some(report_cancel_tx) = report_cancel_tx.take() {
196 let _ = report_cancel_tx.send(());
197 first_error = Some(error);
198 }
199 }
200 }
201 },
202 );
203
204 self.inner.runtime.shutdown_background();
208
209 match (res, first_error) {
210 (Ok(run_stats), None) => Ok(run_stats),
211 (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
212 report_error: Some(report_error),
213 join_errors: Vec::new(),
214 }),
215 (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
216 report_error,
217 join_errors,
218 }),
219 }
220 }
221}
222
223#[derive(Debug)]
224struct TestRunnerInner<'a> {
225 run_id: ReportUuid,
226 profile: &'a EvaluatableProfile<'a>,
227 test_list: &'a TestList<'a>,
228 test_threads: usize,
229 double_spawn: DoubleSpawnInfo,
230 target_runner: TargetRunner,
231 capture_strategy: CaptureStrategy,
232 force_retries: Option<RetryPolicy>,
233 cli_args: Vec<String>,
234 max_fail: MaxFail,
235 runtime: Runtime,
236}
237
238impl<'a> TestRunnerInner<'a> {
239 fn execute<F>(
240 &self,
241 signal_handler: &mut SignalHandler,
242 input_handler: &mut InputHandler,
243 report_cancel_rx: oneshot::Receiver<()>,
244 callback: F,
245 ) -> Result<RunStats, Vec<JoinError>>
246 where
247 F: FnMut(TestEvent<'a>) + Send,
248 {
249 let mut dispatcher_cx = DispatcherContext::new(
252 callback,
253 self.run_id,
254 self.profile.name(),
255 self.cli_args.clone(),
256 self.test_list.run_count(),
257 self.max_fail,
258 );
259
260 let executor_cx = ExecutorContext::new(
261 self.run_id,
262 self.profile,
263 self.test_list,
264 self.double_spawn.clone(),
265 self.target_runner.clone(),
266 self.capture_strategy,
267 self.force_retries,
268 );
269
270 dispatcher_cx.run_started(self.test_list);
274
275 let executor_cx_ref = &executor_cx;
276 let dispatcher_cx_mut = &mut dispatcher_cx;
277
278 let _guard = self.runtime.enter();
279
280 let ((), results) = TokioScope::scope_and_block(move |scope| {
281 let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
282
283 let dispatcher_fut =
285 dispatcher_cx_mut.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
286 scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
287
288 let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
289 let script_resp_tx = resp_tx.clone();
290 let run_scripts_fut = async move {
291 let script_data = executor_cx_ref.run_setup_scripts(script_resp_tx).await;
294 if script_tx.send(script_data).is_err() {
295 debug!("script_tx.send failed, shutting down");
297 }
298 RunnerTaskState::finished_no_children()
299 };
300 scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
301
302 let Some(script_data) = script_rx.blocking_recv() else {
303 debug!("no script data received, shutting down");
305 return;
306 };
307
308 let groups = self
310 .profile
311 .test_group_config()
312 .iter()
313 .map(|(group_name, config)| (group_name, config.max_threads.compute()));
314
315 let setup_script_data = Arc::new(script_data);
316
317 let filter_resp_tx = resp_tx.clone();
318
319 let tests = self.test_list.to_priority_queue(self.profile);
320 let run_tests_fut = futures::stream::iter(tests)
321 .filter_map(move |test| {
322 let filter_resp_tx = filter_resp_tx.clone();
330 async move {
331 if let FilterMatch::Mismatch { reason } =
332 test.instance.test_info.filter_match
333 {
334 let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
336 test_instance: test.instance,
337 reason,
338 });
339 return None;
340 }
341 Some(test)
342 }
343 })
344 .map(move |test: TestInstanceWithSettings<'a>| {
345 let threads_required =
346 test.settings.threads_required().compute(self.test_threads);
347 let test_group = match test.settings.test_group() {
348 TestGroup::Global => None,
349 TestGroup::Custom(name) => Some(name.clone()),
350 };
351 let resp_tx = resp_tx.clone();
352 let setup_script_data = setup_script_data.clone();
353
354 let test_instance = test.instance;
355
356 let f = move |cx: FutureQueueContext| {
357 debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
358 async move {
364 let ((), mut ret) = unsafe {
378 TokioScope::scope_and_collect(move |scope| {
379 scope.spawn(executor_cx_ref.run_test_instance(
380 test,
381 cx,
382 resp_tx.clone(),
383 setup_script_data,
384 ))
385 })
386 }
387 .await;
388
389 let Some(result) = ret.pop() else {
392 warn!(
393 "no task was started for test instance: {}",
394 test_instance.id()
395 );
396 return None;
397 };
398 result.err()
399 }
400 };
401
402 (threads_required, test_group, f)
403 })
404 .future_queue_grouped(self.test_threads, groups)
407 .filter_map(std::future::ready)
409 .collect::<Vec<_>>()
410 .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
415
416 scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
417 });
418
419 dispatcher_cx.run_finished();
420
421 let mut cancelled_count = 0;
428 let join_errors = results
429 .into_iter()
430 .flat_map(|r| {
431 match r {
432 Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
433 Ok(RunnerTaskState::Cancelled) => {
436 cancelled_count += 1;
437 Vec::new()
438 }
439 Err(join_error) => vec![join_error],
440 }
441 })
442 .collect::<Vec<_>>();
443
444 if cancelled_count > 0 {
445 debug!(
446 "{} tasks were cancelled -- this \
447 generally should only happen due to panics",
448 cancelled_count
449 );
450 }
451 if !join_errors.is_empty() {
452 return Err(join_errors);
453 }
454 Ok(dispatcher_cx.run_stats())
455 }
456}
457
458pub fn configure_handle_inheritance(
471 no_capture: bool,
472) -> Result<(), ConfigureHandleInheritanceError> {
473 super::os::configure_handle_inheritance_impl(no_capture)
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479 use crate::{config::NextestConfig, platform::BuildPlatforms};
480
481 #[test]
482 fn no_capture_settings() {
483 let mut builder = TestRunnerBuilder::default();
485 builder
486 .set_capture_strategy(CaptureStrategy::None)
487 .set_test_threads(TestThreads::Count(20));
488 let test_list = TestList::empty();
489 let config = NextestConfig::default_config("/fake/dir");
490 let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
491 let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
492 let signal_handler = SignalHandlerKind::Noop;
493 let input_handler = InputHandlerKind::Noop;
494 let profile = profile.apply_build_platforms(&build_platforms);
495 let runner = builder
496 .build(
497 &test_list,
498 &profile,
499 vec![],
500 signal_handler,
501 input_handler,
502 DoubleSpawnInfo::disabled(),
503 TargetRunner::empty(),
504 )
505 .unwrap();
506 assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
507 assert_eq!(runner.inner.test_threads, 1, "tests run serially");
508 }
509}