1use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6 config::{
7 core::EvaluatableProfile,
8 elements::{MaxFail, RetryPolicy, TestGroup, TestThreads},
9 scripts::SetupScriptExecuteData,
10 },
11 double_spawn::DoubleSpawnInfo,
12 errors::{
13 ConfigureHandleInheritanceError, DebuggerCommandParseError, StressCountParseError,
14 TestRunnerBuildError, TestRunnerExecuteErrors, TracerCommandParseError,
15 },
16 input::{InputHandler, InputHandlerKind, InputHandlerStatus},
17 list::{TestInstanceWithSettings, TestList},
18 reporter::events::{ReporterEvent, RunStats, StressIndex},
19 runner::ExecutorEvent,
20 signal::{SignalHandler, SignalHandlerKind},
21 target_runner::TargetRunner,
22 test_output::CaptureStrategy,
23};
24use async_scoped::TokioScope;
25use future_queue::{FutureQueueContext, StreamExt};
26use futures::{future::Fuse, prelude::*};
27use nextest_metadata::FilterMatch;
28use quick_junit::ReportUuid;
29use std::{
30 convert::Infallible, fmt, num::NonZero, pin::Pin, str::FromStr, sync::Arc, time::Duration,
31};
32use tokio::{
33 runtime::Runtime,
34 sync::{mpsc::unbounded_channel, oneshot},
35 task::JoinError,
36};
37use tracing::{debug, warn};
38
39#[derive(Clone, Debug)]
41pub struct DebuggerCommand {
42 program: String,
43 args: Vec<String>,
44}
45
46impl DebuggerCommand {
47 pub fn program(&self) -> &str {
49 &self.program
51 }
52
53 pub fn args(&self) -> &[String] {
55 &self.args
56 }
57}
58
59impl FromStr for DebuggerCommand {
60 type Err = DebuggerCommandParseError;
61
62 fn from_str(command: &str) -> Result<Self, Self::Err> {
63 let mut parts =
64 shell_words::split(command).map_err(DebuggerCommandParseError::ShellWordsParse)?;
65 if parts.is_empty() {
66 return Err(DebuggerCommandParseError::EmptyCommand);
67 }
68 let program = parts.remove(0);
69 Ok(Self {
70 program,
71 args: parts,
72 })
73 }
74}
75
76#[derive(Clone, Debug)]
78pub struct TracerCommand {
79 program: String,
80 args: Vec<String>,
81}
82
83impl TracerCommand {
84 pub fn program(&self) -> &str {
86 &self.program
87 }
88
89 pub fn args(&self) -> &[String] {
91 &self.args
92 }
93}
94
95impl FromStr for TracerCommand {
96 type Err = TracerCommandParseError;
97
98 fn from_str(command: &str) -> Result<Self, Self::Err> {
99 let mut parts =
100 shell_words::split(command).map_err(TracerCommandParseError::ShellWordsParse)?;
101 if parts.is_empty() {
102 return Err(TracerCommandParseError::EmptyCommand);
103 }
104 let program = parts.remove(0);
105 Ok(Self {
106 program,
107 args: parts,
108 })
109 }
110}
111
112#[derive(Clone, Debug, Default)]
114pub enum Interceptor {
115 #[default]
117 None,
118
119 Debugger(DebuggerCommand),
121
122 Tracer(TracerCommand),
124}
125
126impl Interceptor {
127 pub fn should_disable_timeouts(&self) -> bool {
131 match self {
132 Interceptor::None => false,
133 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
134 }
135 }
136
137 pub fn should_passthrough_stdin(&self) -> bool {
141 match self {
142 Interceptor::None | Interceptor::Tracer(_) => false,
143 Interceptor::Debugger(_) => true,
144 }
145 }
146
147 pub fn should_create_process_group(&self) -> bool {
152 match self {
153 Interceptor::None | Interceptor::Tracer(_) => true,
154 Interceptor::Debugger(_) => false,
155 }
156 }
157
158 pub fn should_skip_leak_detection(&self) -> bool {
162 match self {
163 Interceptor::None => false,
164 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
165 }
166 }
167
168 pub fn should_show_wrapper_command(&self) -> bool {
172 match self {
173 Interceptor::None => false,
174 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
175 }
176 }
177
178 pub fn should_send_sigtstp(&self) -> bool {
185 match self {
186 Interceptor::None | Interceptor::Tracer(_) => true,
187 Interceptor::Debugger(_) => false,
188 }
189 }
190}
191
192#[derive(Copy, Clone, Debug)]
194pub(super) enum ChildPid {
195 Process(#[cfg_attr(not(unix), expect(unused))] u32),
197
198 #[cfg(unix)]
200 ProcessGroup(u32),
201}
202
203impl ChildPid {
204 #[cfg(unix)]
211 pub(super) fn for_kill(self) -> i32 {
212 match self {
213 ChildPid::Process(pid) => pid as i32,
214 ChildPid::ProcessGroup(pid) => -(pid as i32),
215 }
216 }
217}
218
219#[derive(Debug, Default)]
221pub struct TestRunnerBuilder {
222 capture_strategy: CaptureStrategy,
223 retries: Option<RetryPolicy>,
224 max_fail: Option<MaxFail>,
225 test_threads: Option<TestThreads>,
226 stress_condition: Option<StressCondition>,
227 interceptor: Interceptor,
228}
229
230impl TestRunnerBuilder {
231 pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
242 self.capture_strategy = strategy;
243 self
244 }
245
246 pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
248 self.retries = Some(retries);
249 self
250 }
251
252 pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
254 self.max_fail = Some(max_fail);
255 self
256 }
257
258 pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
260 self.test_threads = Some(test_threads);
261 self
262 }
263
264 pub fn set_stress_condition(&mut self, stress_condition: StressCondition) -> &mut Self {
266 self.stress_condition = Some(stress_condition);
267 self
268 }
269
270 pub fn set_interceptor(&mut self, interceptor: Interceptor) -> &mut Self {
272 self.interceptor = interceptor;
273 self
274 }
275
276 #[expect(clippy::too_many_arguments)]
278 pub fn build<'a>(
279 self,
280 test_list: &'a TestList,
281 profile: &'a EvaluatableProfile<'a>,
282 cli_args: Vec<String>,
283 signal_handler: SignalHandlerKind,
284 input_handler: InputHandlerKind,
285 double_spawn: DoubleSpawnInfo,
286 target_runner: TargetRunner,
287 ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
288 let test_threads = match self.capture_strategy {
289 CaptureStrategy::None => 1,
290 CaptureStrategy::Combined | CaptureStrategy::Split => self
291 .test_threads
292 .unwrap_or_else(|| profile.test_threads())
293 .compute(),
294 };
295 let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
296
297 let runtime = tokio::runtime::Builder::new_multi_thread()
298 .enable_all()
299 .thread_name("nextest-runner-worker")
300 .build()
301 .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
302 let _guard = runtime.enter();
303
304 let signal_handler = signal_handler.build()?;
306
307 let input_handler = input_handler.build();
308
309 Ok(TestRunner {
310 inner: TestRunnerInner {
311 run_id: ReportUuid::new_v4(),
312 profile,
313 test_list,
314 test_threads,
315 double_spawn,
316 target_runner,
317 capture_strategy: self.capture_strategy,
318 force_retries: self.retries,
319 cli_args,
320 max_fail,
321 stress_condition: self.stress_condition,
322 interceptor: self.interceptor,
323 runtime,
324 },
325 signal_handler,
326 input_handler,
327 })
328 }
329}
330
331#[derive(Clone, Debug)]
333pub enum StressCondition {
334 Count(StressCount),
336
337 Duration(Duration),
339}
340
341#[derive(Clone, Copy, Debug)]
343pub enum StressCount {
344 Count(NonZero<u32>),
346
347 Infinite,
349}
350
351impl FromStr for StressCount {
352 type Err = StressCountParseError;
353
354 fn from_str(s: &str) -> Result<Self, Self::Err> {
355 if s == "infinite" {
356 Ok(StressCount::Infinite)
357 } else {
358 match s.parse() {
359 Ok(count) => Ok(StressCount::Count(count)),
360 Err(_) => Err(StressCountParseError::new(s)),
361 }
362 }
363 }
364}
365
366#[derive(Debug)]
370pub struct TestRunner<'a> {
371 inner: TestRunnerInner<'a>,
372 signal_handler: SignalHandler,
373 input_handler: InputHandler,
374}
375
376impl<'a> TestRunner<'a> {
377 pub fn input_handler_status(&self) -> InputHandlerStatus {
379 self.input_handler.status()
380 }
381
382 pub fn execute<F>(
388 self,
389 mut callback: F,
390 ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
391 where
392 F: FnMut(ReporterEvent<'a>) + Send,
393 {
394 self.try_execute::<Infallible, _>(|event| {
395 callback(event);
396 Ok(())
397 })
398 }
399
400 pub fn try_execute<E, F>(
407 mut self,
408 mut callback: F,
409 ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
410 where
411 F: FnMut(ReporterEvent<'a>) -> Result<(), E> + Send,
412 E: fmt::Debug + Send,
413 {
414 let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
415
416 let mut report_cancel_tx = Some(report_cancel_tx);
420 let mut first_error = None;
421
422 let res = self.inner.execute(
423 &mut self.signal_handler,
424 &mut self.input_handler,
425 report_cancel_rx,
426 |event| {
427 match callback(event) {
428 Ok(()) => {}
429 Err(error) => {
430 if let Some(report_cancel_tx) = report_cancel_tx.take() {
434 let _ = report_cancel_tx.send(());
435 first_error = Some(error);
436 }
437 }
438 }
439 },
440 );
441
442 self.inner.runtime.shutdown_background();
446
447 match (res, first_error) {
448 (Ok(run_stats), None) => Ok(run_stats),
449 (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
450 report_error: Some(report_error),
451 join_errors: Vec::new(),
452 }),
453 (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
454 report_error,
455 join_errors,
456 }),
457 }
458 }
459}
460
461#[derive(Debug)]
462struct TestRunnerInner<'a> {
463 run_id: ReportUuid,
464 profile: &'a EvaluatableProfile<'a>,
465 test_list: &'a TestList<'a>,
466 test_threads: usize,
467 double_spawn: DoubleSpawnInfo,
468 target_runner: TargetRunner,
469 capture_strategy: CaptureStrategy,
470 force_retries: Option<RetryPolicy>,
471 cli_args: Vec<String>,
472 max_fail: MaxFail,
473 stress_condition: Option<StressCondition>,
474 interceptor: Interceptor,
475 runtime: Runtime,
476}
477
478impl<'a> TestRunnerInner<'a> {
479 fn execute<F>(
480 &self,
481 signal_handler: &mut SignalHandler,
482 input_handler: &mut InputHandler,
483 report_cancel_rx: oneshot::Receiver<()>,
484 callback: F,
485 ) -> Result<RunStats, Vec<JoinError>>
486 where
487 F: FnMut(ReporterEvent<'a>) + Send,
488 {
489 let global_timeout = if self.interceptor.should_disable_timeouts() {
493 crate::time::far_future_duration()
494 } else {
495 self.profile.global_timeout().period
496 };
497
498 let mut dispatcher_cx = DispatcherContext::new(
499 callback,
500 self.run_id,
501 self.profile.name(),
502 self.cli_args.clone(),
503 self.test_list.run_count(),
504 self.max_fail,
505 global_timeout,
506 self.stress_condition.clone(),
507 );
508
509 let executor_cx = ExecutorContext::new(
510 self.run_id,
511 self.profile,
512 self.test_list,
513 self.double_spawn.clone(),
514 self.target_runner.clone(),
515 self.capture_strategy,
516 self.force_retries,
517 self.interceptor.clone(),
518 );
519
520 dispatcher_cx.run_started(self.test_list, self.test_threads);
522
523 let _guard = self.runtime.enter();
524
525 let mut report_cancel_rx = std::pin::pin!(report_cancel_rx.fuse());
526
527 if self.stress_condition.is_some() {
528 loop {
529 let progress = dispatcher_cx
530 .stress_progress()
531 .expect("stress_condition is Some => stress progress is Some");
532 if progress.remaining().is_some() {
533 dispatcher_cx.stress_sub_run_started(progress);
534
535 self.do_run(
536 dispatcher_cx.stress_index(),
537 &mut dispatcher_cx,
538 &executor_cx,
539 signal_handler,
540 input_handler,
541 report_cancel_rx.as_mut(),
542 )?;
543
544 dispatcher_cx.stress_sub_run_finished();
545
546 if dispatcher_cx.cancel_reason().is_some() {
547 break;
548 }
549 } else {
550 break;
551 }
552 }
553 } else {
554 self.do_run(
555 None,
556 &mut dispatcher_cx,
557 &executor_cx,
558 signal_handler,
559 input_handler,
560 report_cancel_rx,
561 )?;
562 }
563
564 let run_stats = dispatcher_cx.run_stats();
565 dispatcher_cx.run_finished();
566
567 Ok(run_stats)
568 }
569
570 fn do_run<F>(
571 &self,
572 stress_index: Option<StressIndex>,
573 dispatcher_cx: &mut DispatcherContext<'a, F>,
574 executor_cx: &ExecutorContext<'a>,
575 signal_handler: &mut SignalHandler,
576 input_handler: &mut InputHandler,
577 report_cancel_rx: Pin<&mut Fuse<oneshot::Receiver<()>>>,
578 ) -> Result<(), Vec<JoinError>>
579 where
580 F: FnMut(ReporterEvent<'a>) + Send,
581 {
582 let ((), results) = TokioScope::scope_and_block(move |scope| {
583 let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
584
585 let dispatcher_fut =
587 dispatcher_cx.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
588 scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
589
590 let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
591 let script_resp_tx = resp_tx.clone();
592 let run_scripts_fut = async move {
593 let script_data = executor_cx
596 .run_setup_scripts(stress_index, script_resp_tx)
597 .await;
598 if script_tx.send(script_data).is_err() {
599 debug!("script_tx.send failed, shutting down");
601 }
602 RunnerTaskState::finished_no_children()
603 };
604 scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
605
606 let Some(script_data) = script_rx.blocking_recv() else {
607 debug!("no script data received, shutting down");
609 return;
610 };
611
612 let groups = self
614 .profile
615 .test_group_config()
616 .iter()
617 .map(|(group_name, config)| (group_name, config.max_threads.compute()));
618
619 let setup_script_data = Arc::new(script_data);
620
621 let filter_resp_tx = resp_tx.clone();
622
623 let tests = self.test_list.to_priority_queue(self.profile);
624 let run_tests_fut = futures::stream::iter(tests)
625 .filter_map(move |test| {
626 let filter_resp_tx = filter_resp_tx.clone();
634 async move {
635 if let FilterMatch::Mismatch { reason } =
636 test.instance.test_info.filter_match
637 {
638 let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
640 stress_index,
641 test_instance: test.instance,
642 reason,
643 });
644 return None;
645 }
646 Some(test)
647 }
648 })
649 .map(move |test: TestInstanceWithSettings<'a>| {
650 let threads_required =
651 test.settings.threads_required().compute(self.test_threads);
652 let test_group = match test.settings.test_group() {
653 TestGroup::Global => None,
654 TestGroup::Custom(name) => Some(name.clone()),
655 };
656 let resp_tx = resp_tx.clone();
657 let setup_script_data = setup_script_data.clone();
658
659 let test_instance = test.instance;
660
661 let f = move |cx: FutureQueueContext| {
662 debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
663 async move {
669 let ((), mut ret) = unsafe {
683 TokioScope::scope_and_collect(move |scope| {
684 scope.spawn(executor_cx.run_test_instance(
685 stress_index,
686 test,
687 cx,
688 resp_tx.clone(),
689 setup_script_data,
690 ))
691 })
692 }
693 .await;
694
695 let Some(result) = ret.pop() else {
698 warn!(
699 "no task was started for test instance: {}",
700 test_instance.id()
701 );
702 return None;
703 };
704 result.err()
705 }
706 };
707
708 (threads_required, test_group, f)
709 })
710 .future_queue_grouped(self.test_threads, groups)
713 .filter_map(std::future::ready)
715 .collect::<Vec<_>>()
716 .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
721
722 scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
723 });
724
725 let mut cancelled_count = 0;
732 let join_errors = results
733 .into_iter()
734 .flat_map(|r| {
735 match r {
736 Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
737 Ok(RunnerTaskState::Cancelled) => {
740 cancelled_count += 1;
741 Vec::new()
742 }
743 Err(join_error) => vec![join_error],
744 }
745 })
746 .collect::<Vec<_>>();
747
748 if cancelled_count > 0 {
749 debug!(
750 "{} tasks were cancelled -- this \
751 generally should only happen due to panics",
752 cancelled_count
753 );
754 }
755 if !join_errors.is_empty() {
756 return Err(join_errors);
757 }
758
759 Ok(())
760 }
761}
762
763pub fn configure_handle_inheritance(
776 no_capture: bool,
777) -> Result<(), ConfigureHandleInheritanceError> {
778 super::os::configure_handle_inheritance_impl(no_capture)
779}
780
781#[cfg(test)]
782mod tests {
783 use super::*;
784 use crate::{config::core::NextestConfig, platform::BuildPlatforms};
785
786 #[test]
787 fn no_capture_settings() {
788 let mut builder = TestRunnerBuilder::default();
790 builder
791 .set_capture_strategy(CaptureStrategy::None)
792 .set_test_threads(TestThreads::Count(20));
793 let test_list = TestList::empty();
794 let config = NextestConfig::default_config("/fake/dir");
795 let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
796 let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
797 let signal_handler = SignalHandlerKind::Noop;
798 let input_handler = InputHandlerKind::Noop;
799 let profile = profile.apply_build_platforms(&build_platforms);
800 let runner = builder
801 .build(
802 &test_list,
803 &profile,
804 vec![],
805 signal_handler,
806 input_handler,
807 DoubleSpawnInfo::disabled(),
808 TargetRunner::empty(),
809 )
810 .unwrap();
811 assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
812 assert_eq!(runner.inner.test_threads, 1, "tests run serially");
813 }
814
815 #[test]
816 fn test_debugger_command_parsing() {
817 let cmd = DebuggerCommand::from_str("gdb --args").unwrap();
819 assert_eq!(cmd.program(), "gdb");
820 assert_eq!(cmd.args(), &["--args"]);
821
822 let cmd = DebuggerCommand::from_str("rust-gdb -ex run --args").unwrap();
823 assert_eq!(cmd.program(), "rust-gdb");
824 assert_eq!(cmd.args(), &["-ex", "run", "--args"]);
825
826 let cmd = DebuggerCommand::from_str(r#"gdb -ex "set print pretty on" --args"#).unwrap();
828 assert_eq!(cmd.program(), "gdb");
829 assert_eq!(cmd.args(), &["-ex", "set print pretty on", "--args"]);
830
831 let err = DebuggerCommand::from_str("").unwrap_err();
833 assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
834
835 let err = DebuggerCommand::from_str(" ").unwrap_err();
837 assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
838 }
839}