1use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6 config::{
7 core::EvaluatableProfile,
8 elements::{MaxFail, RetryPolicy, TestGroup, TestThreads},
9 scripts::SetupScriptExecuteData,
10 },
11 double_spawn::DoubleSpawnInfo,
12 errors::{
13 ConfigureHandleInheritanceError, StressCountParseError, TestRunnerBuildError,
14 TestRunnerExecuteErrors,
15 },
16 input::{InputHandler, InputHandlerKind, InputHandlerStatus},
17 list::{TestInstanceWithSettings, TestList},
18 reporter::events::{RunStats, StressIndex, TestEvent},
19 runner::ExecutorEvent,
20 signal::{SignalHandler, SignalHandlerKind},
21 target_runner::TargetRunner,
22 test_output::CaptureStrategy,
23};
24use async_scoped::TokioScope;
25use future_queue::{FutureQueueContext, StreamExt};
26use futures::{future::Fuse, prelude::*};
27use nextest_metadata::FilterMatch;
28use quick_junit::ReportUuid;
29use std::{
30 convert::Infallible, fmt, num::NonZero, pin::Pin, str::FromStr, sync::Arc, time::Duration,
31};
32use tokio::{
33 runtime::Runtime,
34 sync::{mpsc::unbounded_channel, oneshot},
35 task::JoinError,
36};
37use tracing::{debug, warn};
38
39#[derive(Debug, Default)]
41pub struct TestRunnerBuilder {
42 capture_strategy: CaptureStrategy,
43 retries: Option<RetryPolicy>,
44 max_fail: Option<MaxFail>,
45 test_threads: Option<TestThreads>,
46 stress_condition: Option<StressCondition>,
47}
48
49impl TestRunnerBuilder {
50 pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
61 self.capture_strategy = strategy;
62 self
63 }
64
65 pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
67 self.retries = Some(retries);
68 self
69 }
70
71 pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
73 self.max_fail = Some(max_fail);
74 self
75 }
76
77 pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
79 self.test_threads = Some(test_threads);
80 self
81 }
82
83 pub fn set_stress_condition(&mut self, stress_condition: StressCondition) -> &mut Self {
85 self.stress_condition = Some(stress_condition);
86 self
87 }
88
89 #[expect(clippy::too_many_arguments)]
91 pub fn build<'a>(
92 self,
93 test_list: &'a TestList,
94 profile: &'a EvaluatableProfile<'a>,
95 cli_args: Vec<String>,
96 signal_handler: SignalHandlerKind,
97 input_handler: InputHandlerKind,
98 double_spawn: DoubleSpawnInfo,
99 target_runner: TargetRunner,
100 ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
101 let test_threads = match self.capture_strategy {
102 CaptureStrategy::None => 1,
103 CaptureStrategy::Combined | CaptureStrategy::Split => self
104 .test_threads
105 .unwrap_or_else(|| profile.test_threads())
106 .compute(),
107 };
108 let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
109
110 let runtime = tokio::runtime::Builder::new_multi_thread()
111 .enable_all()
112 .thread_name("nextest-runner-worker")
113 .build()
114 .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
115 let _guard = runtime.enter();
116
117 let signal_handler = signal_handler.build()?;
119
120 let input_handler = input_handler.build();
121
122 Ok(TestRunner {
123 inner: TestRunnerInner {
124 run_id: ReportUuid::new_v4(),
125 profile,
126 test_list,
127 test_threads,
128 double_spawn,
129 target_runner,
130 capture_strategy: self.capture_strategy,
131 force_retries: self.retries,
132 cli_args,
133 max_fail,
134 stress_condition: self.stress_condition,
135 runtime,
136 },
137 signal_handler,
138 input_handler,
139 })
140 }
141}
142
143#[derive(Clone, Debug)]
145pub enum StressCondition {
146 Count(StressCount),
148
149 Duration(Duration),
151}
152
153#[derive(Clone, Copy, Debug)]
155pub enum StressCount {
156 Count(NonZero<u32>),
158
159 Infinite,
161}
162
163impl FromStr for StressCount {
164 type Err = StressCountParseError;
165
166 fn from_str(s: &str) -> Result<Self, Self::Err> {
167 if s == "infinite" {
168 Ok(StressCount::Infinite)
169 } else {
170 match s.parse() {
171 Ok(count) => Ok(StressCount::Count(count)),
172 Err(_) => Err(StressCountParseError::new(s)),
173 }
174 }
175 }
176}
177
178#[derive(Debug)]
182pub struct TestRunner<'a> {
183 inner: TestRunnerInner<'a>,
184 signal_handler: SignalHandler,
185 input_handler: InputHandler,
186}
187
188impl<'a> TestRunner<'a> {
189 pub fn input_handler_status(&self) -> InputHandlerStatus {
191 self.input_handler.status()
192 }
193
194 pub fn execute<F>(
200 self,
201 mut callback: F,
202 ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
203 where
204 F: FnMut(TestEvent<'a>) + Send,
205 {
206 self.try_execute::<Infallible, _>(|test_event| {
207 callback(test_event);
208 Ok(())
209 })
210 }
211
212 pub fn try_execute<E, F>(
219 mut self,
220 mut callback: F,
221 ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
222 where
223 F: FnMut(TestEvent<'a>) -> Result<(), E> + Send,
224 E: fmt::Debug + Send,
225 {
226 let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
227
228 let mut report_cancel_tx = Some(report_cancel_tx);
232 let mut first_error = None;
233
234 let res = self.inner.execute(
235 &mut self.signal_handler,
236 &mut self.input_handler,
237 report_cancel_rx,
238 |event| {
239 match callback(event) {
240 Ok(()) => {}
241 Err(error) => {
242 if let Some(report_cancel_tx) = report_cancel_tx.take() {
246 let _ = report_cancel_tx.send(());
247 first_error = Some(error);
248 }
249 }
250 }
251 },
252 );
253
254 self.inner.runtime.shutdown_background();
258
259 match (res, first_error) {
260 (Ok(run_stats), None) => Ok(run_stats),
261 (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
262 report_error: Some(report_error),
263 join_errors: Vec::new(),
264 }),
265 (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
266 report_error,
267 join_errors,
268 }),
269 }
270 }
271}
272
273#[derive(Debug)]
274struct TestRunnerInner<'a> {
275 run_id: ReportUuid,
276 profile: &'a EvaluatableProfile<'a>,
277 test_list: &'a TestList<'a>,
278 test_threads: usize,
279 double_spawn: DoubleSpawnInfo,
280 target_runner: TargetRunner,
281 capture_strategy: CaptureStrategy,
282 force_retries: Option<RetryPolicy>,
283 cli_args: Vec<String>,
284 max_fail: MaxFail,
285 stress_condition: Option<StressCondition>,
286 runtime: Runtime,
287}
288
289impl<'a> TestRunnerInner<'a> {
290 fn execute<F>(
291 &self,
292 signal_handler: &mut SignalHandler,
293 input_handler: &mut InputHandler,
294 report_cancel_rx: oneshot::Receiver<()>,
295 callback: F,
296 ) -> Result<RunStats, Vec<JoinError>>
297 where
298 F: FnMut(TestEvent<'a>) + Send,
299 {
300 let mut dispatcher_cx = DispatcherContext::new(
303 callback,
304 self.run_id,
305 self.profile.name(),
306 self.cli_args.clone(),
307 self.test_list.run_count(),
308 self.max_fail,
309 self.profile.global_timeout().period,
310 self.stress_condition.clone(),
311 );
312
313 let executor_cx = ExecutorContext::new(
314 self.run_id,
315 self.profile,
316 self.test_list,
317 self.double_spawn.clone(),
318 self.target_runner.clone(),
319 self.capture_strategy,
320 self.force_retries,
321 );
322
323 dispatcher_cx.run_started(self.test_list);
325
326 let _guard = self.runtime.enter();
327
328 let mut report_cancel_rx = std::pin::pin!(report_cancel_rx.fuse());
329
330 if self.stress_condition.is_some() {
331 loop {
332 let progress = dispatcher_cx
333 .stress_progress()
334 .expect("stress_condition is Some => stress progress is Some");
335 if progress.remaining().is_some() {
336 dispatcher_cx.stress_sub_run_started(progress);
337
338 self.do_run(
339 dispatcher_cx.stress_index(),
340 &mut dispatcher_cx,
341 &executor_cx,
342 signal_handler,
343 input_handler,
344 report_cancel_rx.as_mut(),
345 )?;
346
347 dispatcher_cx.stress_sub_run_finished();
348
349 if dispatcher_cx.cancel_reason().is_some() {
350 break;
351 }
352 } else {
353 break;
354 }
355 }
356 } else {
357 self.do_run(
358 None,
359 &mut dispatcher_cx,
360 &executor_cx,
361 signal_handler,
362 input_handler,
363 report_cancel_rx,
364 )?;
365 }
366
367 dispatcher_cx.run_finished();
368
369 Ok(dispatcher_cx.run_stats())
370 }
371
372 fn do_run<F>(
373 &self,
374 stress_index: Option<StressIndex>,
375 dispatcher_cx: &mut DispatcherContext<'a, F>,
376 executor_cx: &ExecutorContext<'a>,
377 signal_handler: &mut SignalHandler,
378 input_handler: &mut InputHandler,
379 report_cancel_rx: Pin<&mut Fuse<oneshot::Receiver<()>>>,
380 ) -> Result<(), Vec<JoinError>>
381 where
382 F: FnMut(TestEvent<'a>) + Send,
383 {
384 let ((), results) = TokioScope::scope_and_block(move |scope| {
385 let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
386
387 let dispatcher_fut =
389 dispatcher_cx.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
390 scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
391
392 let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
393 let script_resp_tx = resp_tx.clone();
394 let run_scripts_fut = async move {
395 let script_data = executor_cx
398 .run_setup_scripts(stress_index, script_resp_tx)
399 .await;
400 if script_tx.send(script_data).is_err() {
401 debug!("script_tx.send failed, shutting down");
403 }
404 RunnerTaskState::finished_no_children()
405 };
406 scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
407
408 let Some(script_data) = script_rx.blocking_recv() else {
409 debug!("no script data received, shutting down");
411 return;
412 };
413
414 let groups = self
416 .profile
417 .test_group_config()
418 .iter()
419 .map(|(group_name, config)| (group_name, config.max_threads.compute()));
420
421 let setup_script_data = Arc::new(script_data);
422
423 let filter_resp_tx = resp_tx.clone();
424
425 let tests = self.test_list.to_priority_queue(self.profile);
426 let run_tests_fut = futures::stream::iter(tests)
427 .filter_map(move |test| {
428 let filter_resp_tx = filter_resp_tx.clone();
436 async move {
437 if let FilterMatch::Mismatch { reason } =
438 test.instance.test_info.filter_match
439 {
440 let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
442 stress_index,
443 test_instance: test.instance,
444 reason,
445 });
446 return None;
447 }
448 Some(test)
449 }
450 })
451 .map(move |test: TestInstanceWithSettings<'a>| {
452 let threads_required =
453 test.settings.threads_required().compute(self.test_threads);
454 let test_group = match test.settings.test_group() {
455 TestGroup::Global => None,
456 TestGroup::Custom(name) => Some(name.clone()),
457 };
458 let resp_tx = resp_tx.clone();
459 let setup_script_data = setup_script_data.clone();
460
461 let test_instance = test.instance;
462
463 let f = move |cx: FutureQueueContext| {
464 debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
465 async move {
471 let ((), mut ret) = unsafe {
485 TokioScope::scope_and_collect(move |scope| {
486 scope.spawn(executor_cx.run_test_instance(
487 stress_index,
488 test,
489 cx,
490 resp_tx.clone(),
491 setup_script_data,
492 ))
493 })
494 }
495 .await;
496
497 let Some(result) = ret.pop() else {
500 warn!(
501 "no task was started for test instance: {}",
502 test_instance.id()
503 );
504 return None;
505 };
506 result.err()
507 }
508 };
509
510 (threads_required, test_group, f)
511 })
512 .future_queue_grouped(self.test_threads, groups)
515 .filter_map(std::future::ready)
517 .collect::<Vec<_>>()
518 .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
523
524 scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
525 });
526
527 let mut cancelled_count = 0;
534 let join_errors = results
535 .into_iter()
536 .flat_map(|r| {
537 match r {
538 Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
539 Ok(RunnerTaskState::Cancelled) => {
542 cancelled_count += 1;
543 Vec::new()
544 }
545 Err(join_error) => vec![join_error],
546 }
547 })
548 .collect::<Vec<_>>();
549
550 if cancelled_count > 0 {
551 debug!(
552 "{} tasks were cancelled -- this \
553 generally should only happen due to panics",
554 cancelled_count
555 );
556 }
557 if !join_errors.is_empty() {
558 return Err(join_errors);
559 }
560
561 Ok(())
562 }
563}
564
565pub fn configure_handle_inheritance(
578 no_capture: bool,
579) -> Result<(), ConfigureHandleInheritanceError> {
580 super::os::configure_handle_inheritance_impl(no_capture)
581}
582
583#[cfg(test)]
584mod tests {
585 use super::*;
586 use crate::{config::core::NextestConfig, platform::BuildPlatforms};
587
588 #[test]
589 fn no_capture_settings() {
590 let mut builder = TestRunnerBuilder::default();
592 builder
593 .set_capture_strategy(CaptureStrategy::None)
594 .set_test_threads(TestThreads::Count(20));
595 let test_list = TestList::empty();
596 let config = NextestConfig::default_config("/fake/dir");
597 let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
598 let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
599 let signal_handler = SignalHandlerKind::Noop;
600 let input_handler = InputHandlerKind::Noop;
601 let profile = profile.apply_build_platforms(&build_platforms);
602 let runner = builder
603 .build(
604 &test_list,
605 &profile,
606 vec![],
607 signal_handler,
608 input_handler,
609 DoubleSpawnInfo::disabled(),
610 TargetRunner::empty(),
611 )
612 .unwrap();
613 assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
614 assert_eq!(runner.inner.test_threads, 1, "tests run serially");
615 }
616}