1use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6 config::{
7 core::EvaluatableProfile,
8 elements::{MaxFail, RetryPolicy, TestGroup, TestThreads},
9 scripts::SetupScriptExecuteData,
10 },
11 double_spawn::DoubleSpawnInfo,
12 errors::{
13 ConfigureHandleInheritanceError, StressCountParseError, TestRunnerBuildError,
14 TestRunnerExecuteErrors,
15 },
16 input::{InputHandler, InputHandlerKind, InputHandlerStatus},
17 list::{TestInstanceWithSettings, TestList},
18 reporter::events::{RunStats, StressIndex, TestEvent},
19 runner::ExecutorEvent,
20 signal::{SignalHandler, SignalHandlerKind},
21 target_runner::TargetRunner,
22 test_output::CaptureStrategy,
23};
24use async_scoped::TokioScope;
25use future_queue::{FutureQueueContext, StreamExt};
26use futures::{future::Fuse, prelude::*};
27use nextest_metadata::FilterMatch;
28use quick_junit::ReportUuid;
29use std::{
30 convert::Infallible, fmt, num::NonZero, pin::Pin, str::FromStr, sync::Arc, time::Duration,
31};
32use tokio::{
33 runtime::Runtime,
34 sync::{mpsc::unbounded_channel, oneshot},
35 task::JoinError,
36};
37use tracing::{debug, warn};
38
39#[derive(Debug, Default)]
41pub struct TestRunnerBuilder {
42 capture_strategy: CaptureStrategy,
43 retries: Option<RetryPolicy>,
44 max_fail: Option<MaxFail>,
45 test_threads: Option<TestThreads>,
46 stress_condition: Option<StressCondition>,
47}
48
49impl TestRunnerBuilder {
50 pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
61 self.capture_strategy = strategy;
62 self
63 }
64
65 pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
67 self.retries = Some(retries);
68 self
69 }
70
71 pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
73 self.max_fail = Some(max_fail);
74 self
75 }
76
77 pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
79 self.test_threads = Some(test_threads);
80 self
81 }
82
83 pub fn set_stress_condition(&mut self, stress_condition: StressCondition) -> &mut Self {
85 self.stress_condition = Some(stress_condition);
86 self
87 }
88
89 #[expect(clippy::too_many_arguments)]
91 pub fn build<'a>(
92 self,
93 test_list: &'a TestList,
94 profile: &'a EvaluatableProfile<'a>,
95 cli_args: Vec<String>,
96 signal_handler: SignalHandlerKind,
97 input_handler: InputHandlerKind,
98 double_spawn: DoubleSpawnInfo,
99 target_runner: TargetRunner,
100 ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
101 let test_threads = match self.capture_strategy {
102 CaptureStrategy::None => 1,
103 CaptureStrategy::Combined | CaptureStrategy::Split => self
104 .test_threads
105 .unwrap_or_else(|| profile.test_threads())
106 .compute(),
107 };
108 let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
109
110 let runtime = tokio::runtime::Builder::new_multi_thread()
111 .enable_all()
112 .thread_name("nextest-runner-worker")
113 .build()
114 .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
115 let _guard = runtime.enter();
116
117 let signal_handler = signal_handler.build()?;
119
120 let input_handler = input_handler.build();
121
122 Ok(TestRunner {
123 inner: TestRunnerInner {
124 run_id: ReportUuid::new_v4(),
125 profile,
126 test_list,
127 test_threads,
128 double_spawn,
129 target_runner,
130 capture_strategy: self.capture_strategy,
131 force_retries: self.retries,
132 cli_args,
133 max_fail,
134 stress_condition: self.stress_condition,
135 runtime,
136 },
137 signal_handler,
138 input_handler,
139 })
140 }
141}
142
143#[derive(Clone, Debug)]
145pub enum StressCondition {
146 Count(StressCount),
148
149 Duration(Duration),
151}
152
153#[derive(Clone, Copy, Debug)]
155pub enum StressCount {
156 Count(NonZero<u32>),
158
159 Infinite,
161}
162
163impl FromStr for StressCount {
164 type Err = StressCountParseError;
165
166 fn from_str(s: &str) -> Result<Self, Self::Err> {
167 if s == "infinite" {
168 Ok(StressCount::Infinite)
169 } else {
170 match s.parse() {
171 Ok(count) => Ok(StressCount::Count(count)),
172 Err(_) => Err(StressCountParseError::new(s)),
173 }
174 }
175 }
176}
177
178#[derive(Debug)]
182pub struct TestRunner<'a> {
183 inner: TestRunnerInner<'a>,
184 signal_handler: SignalHandler,
185 input_handler: InputHandler,
186}
187
188impl<'a> TestRunner<'a> {
189 pub fn input_handler_status(&self) -> InputHandlerStatus {
191 self.input_handler.status()
192 }
193
194 pub fn execute<F>(
200 self,
201 mut callback: F,
202 ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
203 where
204 F: FnMut(TestEvent<'a>) + Send,
205 {
206 self.try_execute::<Infallible, _>(|test_event| {
207 callback(test_event);
208 Ok(())
209 })
210 }
211
212 pub fn try_execute<E, F>(
219 mut self,
220 mut callback: F,
221 ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
222 where
223 F: FnMut(TestEvent<'a>) -> Result<(), E> + Send,
224 E: fmt::Debug + Send,
225 {
226 let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
227
228 let mut report_cancel_tx = Some(report_cancel_tx);
232 let mut first_error = None;
233
234 let res = self.inner.execute(
235 &mut self.signal_handler,
236 &mut self.input_handler,
237 report_cancel_rx,
238 |event| {
239 match callback(event) {
240 Ok(()) => {}
241 Err(error) => {
242 if let Some(report_cancel_tx) = report_cancel_tx.take() {
246 let _ = report_cancel_tx.send(());
247 first_error = Some(error);
248 }
249 }
250 }
251 },
252 );
253
254 self.inner.runtime.shutdown_background();
258
259 match (res, first_error) {
260 (Ok(run_stats), None) => Ok(run_stats),
261 (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
262 report_error: Some(report_error),
263 join_errors: Vec::new(),
264 }),
265 (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
266 report_error,
267 join_errors,
268 }),
269 }
270 }
271}
272
273#[derive(Debug)]
274struct TestRunnerInner<'a> {
275 run_id: ReportUuid,
276 profile: &'a EvaluatableProfile<'a>,
277 test_list: &'a TestList<'a>,
278 test_threads: usize,
279 double_spawn: DoubleSpawnInfo,
280 target_runner: TargetRunner,
281 capture_strategy: CaptureStrategy,
282 force_retries: Option<RetryPolicy>,
283 cli_args: Vec<String>,
284 max_fail: MaxFail,
285 stress_condition: Option<StressCondition>,
286 runtime: Runtime,
287}
288
289impl<'a> TestRunnerInner<'a> {
290 fn execute<F>(
291 &self,
292 signal_handler: &mut SignalHandler,
293 input_handler: &mut InputHandler,
294 report_cancel_rx: oneshot::Receiver<()>,
295 callback: F,
296 ) -> Result<RunStats, Vec<JoinError>>
297 where
298 F: FnMut(TestEvent<'a>) + Send,
299 {
300 let mut dispatcher_cx = DispatcherContext::new(
303 callback,
304 self.run_id,
305 self.profile.name(),
306 self.cli_args.clone(),
307 self.test_list.run_count(),
308 self.max_fail,
309 self.profile.global_timeout().period,
310 self.stress_condition.clone(),
311 );
312
313 let executor_cx = ExecutorContext::new(
314 self.run_id,
315 self.profile,
316 self.test_list,
317 self.double_spawn.clone(),
318 self.target_runner.clone(),
319 self.capture_strategy,
320 self.force_retries,
321 );
322
323 dispatcher_cx.run_started(self.test_list, self.test_threads);
325
326 let _guard = self.runtime.enter();
327
328 let mut report_cancel_rx = std::pin::pin!(report_cancel_rx.fuse());
329
330 if self.stress_condition.is_some() {
331 loop {
332 let progress = dispatcher_cx
333 .stress_progress()
334 .expect("stress_condition is Some => stress progress is Some");
335 if progress.remaining().is_some() {
336 dispatcher_cx.stress_sub_run_started(progress);
337
338 self.do_run(
339 dispatcher_cx.stress_index(),
340 &mut dispatcher_cx,
341 &executor_cx,
342 signal_handler,
343 input_handler,
344 report_cancel_rx.as_mut(),
345 )?;
346
347 dispatcher_cx.stress_sub_run_finished();
348
349 if dispatcher_cx.cancel_reason().is_some() {
350 break;
351 }
352 } else {
353 break;
354 }
355 }
356 } else {
357 self.do_run(
358 None,
359 &mut dispatcher_cx,
360 &executor_cx,
361 signal_handler,
362 input_handler,
363 report_cancel_rx,
364 )?;
365 }
366
367 let run_stats = dispatcher_cx.run_stats();
368 dispatcher_cx.run_finished();
369
370 Ok(run_stats)
371 }
372
373 fn do_run<F>(
374 &self,
375 stress_index: Option<StressIndex>,
376 dispatcher_cx: &mut DispatcherContext<'a, F>,
377 executor_cx: &ExecutorContext<'a>,
378 signal_handler: &mut SignalHandler,
379 input_handler: &mut InputHandler,
380 report_cancel_rx: Pin<&mut Fuse<oneshot::Receiver<()>>>,
381 ) -> Result<(), Vec<JoinError>>
382 where
383 F: FnMut(TestEvent<'a>) + Send,
384 {
385 let ((), results) = TokioScope::scope_and_block(move |scope| {
386 let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
387
388 let dispatcher_fut =
390 dispatcher_cx.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
391 scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
392
393 let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
394 let script_resp_tx = resp_tx.clone();
395 let run_scripts_fut = async move {
396 let script_data = executor_cx
399 .run_setup_scripts(stress_index, script_resp_tx)
400 .await;
401 if script_tx.send(script_data).is_err() {
402 debug!("script_tx.send failed, shutting down");
404 }
405 RunnerTaskState::finished_no_children()
406 };
407 scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
408
409 let Some(script_data) = script_rx.blocking_recv() else {
410 debug!("no script data received, shutting down");
412 return;
413 };
414
415 let groups = self
417 .profile
418 .test_group_config()
419 .iter()
420 .map(|(group_name, config)| (group_name, config.max_threads.compute()));
421
422 let setup_script_data = Arc::new(script_data);
423
424 let filter_resp_tx = resp_tx.clone();
425
426 let tests = self.test_list.to_priority_queue(self.profile);
427 let run_tests_fut = futures::stream::iter(tests)
428 .filter_map(move |test| {
429 let filter_resp_tx = filter_resp_tx.clone();
437 async move {
438 if let FilterMatch::Mismatch { reason } =
439 test.instance.test_info.filter_match
440 {
441 let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
443 stress_index,
444 test_instance: test.instance,
445 reason,
446 });
447 return None;
448 }
449 Some(test)
450 }
451 })
452 .map(move |test: TestInstanceWithSettings<'a>| {
453 let threads_required =
454 test.settings.threads_required().compute(self.test_threads);
455 let test_group = match test.settings.test_group() {
456 TestGroup::Global => None,
457 TestGroup::Custom(name) => Some(name.clone()),
458 };
459 let resp_tx = resp_tx.clone();
460 let setup_script_data = setup_script_data.clone();
461
462 let test_instance = test.instance;
463
464 let f = move |cx: FutureQueueContext| {
465 debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
466 async move {
472 let ((), mut ret) = unsafe {
486 TokioScope::scope_and_collect(move |scope| {
487 scope.spawn(executor_cx.run_test_instance(
488 stress_index,
489 test,
490 cx,
491 resp_tx.clone(),
492 setup_script_data,
493 ))
494 })
495 }
496 .await;
497
498 let Some(result) = ret.pop() else {
501 warn!(
502 "no task was started for test instance: {}",
503 test_instance.id()
504 );
505 return None;
506 };
507 result.err()
508 }
509 };
510
511 (threads_required, test_group, f)
512 })
513 .future_queue_grouped(self.test_threads, groups)
516 .filter_map(std::future::ready)
518 .collect::<Vec<_>>()
519 .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
524
525 scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
526 });
527
528 let mut cancelled_count = 0;
535 let join_errors = results
536 .into_iter()
537 .flat_map(|r| {
538 match r {
539 Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
540 Ok(RunnerTaskState::Cancelled) => {
543 cancelled_count += 1;
544 Vec::new()
545 }
546 Err(join_error) => vec![join_error],
547 }
548 })
549 .collect::<Vec<_>>();
550
551 if cancelled_count > 0 {
552 debug!(
553 "{} tasks were cancelled -- this \
554 generally should only happen due to panics",
555 cancelled_count
556 );
557 }
558 if !join_errors.is_empty() {
559 return Err(join_errors);
560 }
561
562 Ok(())
563 }
564}
565
566pub fn configure_handle_inheritance(
579 no_capture: bool,
580) -> Result<(), ConfigureHandleInheritanceError> {
581 super::os::configure_handle_inheritance_impl(no_capture)
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587 use crate::{config::core::NextestConfig, platform::BuildPlatforms};
588
589 #[test]
590 fn no_capture_settings() {
591 let mut builder = TestRunnerBuilder::default();
593 builder
594 .set_capture_strategy(CaptureStrategy::None)
595 .set_test_threads(TestThreads::Count(20));
596 let test_list = TestList::empty();
597 let config = NextestConfig::default_config("/fake/dir");
598 let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
599 let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
600 let signal_handler = SignalHandlerKind::Noop;
601 let input_handler = InputHandlerKind::Noop;
602 let profile = profile.apply_build_platforms(&build_platforms);
603 let runner = builder
604 .build(
605 &test_list,
606 &profile,
607 vec![],
608 signal_handler,
609 input_handler,
610 DoubleSpawnInfo::disabled(),
611 TargetRunner::empty(),
612 )
613 .unwrap();
614 assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
615 assert_eq!(runner.inner.test_threads, 1, "tests run serially");
616 }
617}