1use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6 config::{
7 core::EvaluatableProfile,
8 elements::{FlakyResult, MaxFail, RetryPolicy, TestGroup, TestThreads},
9 scripts::SetupScriptExecuteData,
10 },
11 double_spawn::DoubleSpawnInfo,
12 errors::{
13 ConfigureHandleInheritanceError, DebuggerCommandParseError, StressCountParseError,
14 TestRunnerBuildError, TestRunnerExecuteErrors, TracerCommandParseError,
15 },
16 input::{InputHandler, InputHandlerKind, InputHandlerStatus},
17 list::{OwnedTestInstanceId, TestInstanceWithSettings, TestList},
18 reporter::events::{ReporterEvent, RunStats, StressIndex},
19 runner::ExecutorEvent,
20 signal::{SignalHandler, SignalHandlerKind},
21 target_runner::TargetRunner,
22 test_output::CaptureStrategy,
23};
24use async_scoped::TokioScope;
25use chrono::{DateTime, Local};
26use future_queue::{FutureQueueContext, StreamExt};
27use futures::{future::Fuse, prelude::*};
28use nextest_metadata::FilterMatch;
29use quick_junit::ReportUuid;
30use semver::Version;
31use std::{
32 collections::BTreeSet, convert::Infallible, fmt, num::NonZero, pin::Pin, str::FromStr,
33 sync::Arc, time::Duration,
34};
35use tokio::{
36 runtime::Runtime,
37 sync::{mpsc::unbounded_channel, oneshot},
38 task::JoinError,
39};
40use tracing::{debug, warn};
41
42#[derive(Clone, Debug)]
44pub struct DebuggerCommand {
45 program: String,
46 args: Vec<String>,
47}
48
49impl DebuggerCommand {
50 pub fn program(&self) -> &str {
52 &self.program
54 }
55
56 pub fn args(&self) -> &[String] {
58 &self.args
59 }
60}
61
62impl FromStr for DebuggerCommand {
63 type Err = DebuggerCommandParseError;
64
65 fn from_str(command: &str) -> Result<Self, Self::Err> {
66 let mut parts =
67 shell_words::split(command).map_err(DebuggerCommandParseError::ShellWordsParse)?;
68 if parts.is_empty() {
69 return Err(DebuggerCommandParseError::EmptyCommand);
70 }
71 let program = parts.remove(0);
72 Ok(Self {
73 program,
74 args: parts,
75 })
76 }
77}
78
79#[derive(Clone, Debug)]
81pub struct TracerCommand {
82 program: String,
83 args: Vec<String>,
84}
85
86impl TracerCommand {
87 pub fn program(&self) -> &str {
89 &self.program
90 }
91
92 pub fn args(&self) -> &[String] {
94 &self.args
95 }
96}
97
98impl FromStr for TracerCommand {
99 type Err = TracerCommandParseError;
100
101 fn from_str(command: &str) -> Result<Self, Self::Err> {
102 let mut parts =
103 shell_words::split(command).map_err(TracerCommandParseError::ShellWordsParse)?;
104 if parts.is_empty() {
105 return Err(TracerCommandParseError::EmptyCommand);
106 }
107 let program = parts.remove(0);
108 Ok(Self {
109 program,
110 args: parts,
111 })
112 }
113}
114
115#[derive(Clone, Debug, Default)]
117pub enum Interceptor {
118 #[default]
120 None,
121
122 Debugger(DebuggerCommand),
124
125 Tracer(TracerCommand),
127}
128
129impl Interceptor {
130 pub fn should_disable_timeouts(&self) -> bool {
134 match self {
135 Interceptor::None => false,
136 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
137 }
138 }
139
140 pub fn should_passthrough_stdin(&self) -> bool {
144 match self {
145 Interceptor::None | Interceptor::Tracer(_) => false,
146 Interceptor::Debugger(_) => true,
147 }
148 }
149
150 pub fn should_create_process_group(&self) -> bool {
155 match self {
156 Interceptor::None | Interceptor::Tracer(_) => true,
157 Interceptor::Debugger(_) => false,
158 }
159 }
160
161 pub fn should_skip_leak_detection(&self) -> bool {
165 match self {
166 Interceptor::None => false,
167 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
168 }
169 }
170
171 pub fn should_show_wrapper_command(&self) -> bool {
175 match self {
176 Interceptor::None => false,
177 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
178 }
179 }
180
181 pub fn should_send_sigtstp(&self) -> bool {
188 match self {
189 Interceptor::None | Interceptor::Tracer(_) => true,
190 Interceptor::Debugger(_) => false,
191 }
192 }
193}
194
195#[derive(Clone, Debug)]
200pub struct VersionEnvVars {
201 pub current_version: Version,
203
204 pub required_version: Option<Version>,
206
207 pub recommended_version: Option<Version>,
209}
210
211impl VersionEnvVars {
212 pub(super) fn apply_env(&self, cmd: &mut std::process::Command) {
214 cmd.env("NEXTEST_VERSION", self.current_version.to_string());
215 cmd.env(
216 "NEXTEST_REQUIRED_VERSION",
217 match &self.required_version {
218 Some(v) => v.to_string(),
219 None => "none".to_owned(),
220 },
221 );
222 cmd.env(
223 "NEXTEST_RECOMMENDED_VERSION",
224 match &self.recommended_version {
225 Some(v) => v.to_string(),
226 None => "none".to_owned(),
227 },
228 );
229 }
230}
231
232#[derive(Copy, Clone, Debug)]
234pub(super) enum ChildPid {
235 Process(#[cfg_attr(not(unix), expect(unused))] u32),
237
238 #[cfg(unix)]
240 ProcessGroup(u32),
241}
242
243impl ChildPid {
244 #[cfg(unix)]
251 pub(super) fn for_kill(self) -> i32 {
252 match self {
253 ChildPid::Process(pid) => pid as i32,
254 ChildPid::ProcessGroup(pid) => -(pid as i32),
255 }
256 }
257}
258
259#[derive(Debug, Default)]
261pub struct TestRunnerBuilder {
262 capture_strategy: CaptureStrategy,
263 retries: Option<RetryPolicy>,
264 flaky_result: Option<FlakyResult>,
265 max_fail: Option<MaxFail>,
266 test_threads: Option<TestThreads>,
267 stress_condition: Option<StressCondition>,
268 interceptor: Interceptor,
269 expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
270 version_env_vars: Option<VersionEnvVars>,
271}
272
273impl TestRunnerBuilder {
274 pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
285 self.capture_strategy = strategy;
286 self
287 }
288
289 pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
291 self.retries = Some(retries);
292 self
293 }
294
295 pub fn set_flaky_result(&mut self, flaky_result: FlakyResult) -> &mut Self {
297 self.flaky_result = Some(flaky_result);
298 self
299 }
300
301 pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
303 self.max_fail = Some(max_fail);
304 self
305 }
306
307 pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
309 self.test_threads = Some(test_threads);
310 self
311 }
312
313 pub fn set_stress_condition(&mut self, stress_condition: StressCondition) -> &mut Self {
315 self.stress_condition = Some(stress_condition);
316 self
317 }
318
319 pub fn set_interceptor(&mut self, interceptor: Interceptor) -> &mut Self {
321 self.interceptor = interceptor;
322 self
323 }
324
325 pub fn set_expected_outstanding(
331 &mut self,
332 expected: BTreeSet<OwnedTestInstanceId>,
333 ) -> &mut Self {
334 self.expected_outstanding = Some(expected);
335 self
336 }
337
338 pub fn set_version_env_vars(&mut self, version_env_vars: VersionEnvVars) -> &mut Self {
340 self.version_env_vars = Some(version_env_vars);
341 self
342 }
343
344 #[expect(clippy::too_many_arguments)]
346 pub fn build<'a>(
347 self,
348 test_list: &'a TestList,
349 profile: &'a EvaluatableProfile<'a>,
350 cli_args: Vec<String>,
351 signal_handler: SignalHandlerKind,
352 input_handler: InputHandlerKind,
353 double_spawn: DoubleSpawnInfo,
354 target_runner: TargetRunner,
355 ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
356 let test_threads = match self.capture_strategy {
357 CaptureStrategy::None => 1,
358 CaptureStrategy::Combined | CaptureStrategy::Split => self
359 .test_threads
360 .unwrap_or_else(|| profile.test_threads())
361 .compute(),
362 };
363 let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
364
365 let runtime = tokio::runtime::Builder::new_multi_thread()
366 .enable_all()
367 .thread_name("nextest-runner-worker")
368 .build()
369 .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
370 let _guard = runtime.enter();
371
372 let signal_handler = signal_handler.build()?;
374
375 let input_handler = input_handler.build();
376
377 Ok(TestRunner {
378 inner: TestRunnerInner {
379 run_id: force_or_new_run_id(),
380 started_at: Local::now(),
381 profile,
382 test_list,
383 test_threads,
384 double_spawn,
385 target_runner,
386 capture_strategy: self.capture_strategy,
387 force_retries: self.retries,
388 force_flaky_result: self.flaky_result,
389 cli_args,
390 max_fail,
391 stress_condition: self.stress_condition,
392 interceptor: self.interceptor,
393 expected_outstanding: self.expected_outstanding,
394 version_env_vars: self.version_env_vars,
395 runtime,
396 },
397 signal_handler,
398 input_handler,
399 })
400 }
401}
402
403#[derive(Clone, Debug)]
405pub enum StressCondition {
406 Count(StressCount),
408
409 Duration(Duration),
411}
412
413#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
415#[serde(tag = "kind", rename_all = "kebab-case")]
416#[cfg_attr(test, derive(test_strategy::Arbitrary))]
417pub enum StressCount {
418 Count {
420 count: NonZero<u32>,
422 },
423
424 Infinite,
426}
427
428impl FromStr for StressCount {
429 type Err = StressCountParseError;
430
431 fn from_str(s: &str) -> Result<Self, Self::Err> {
432 if s == "infinite" {
433 Ok(StressCount::Infinite)
434 } else {
435 match s.parse() {
436 Ok(count) => Ok(StressCount::Count { count }),
437 Err(_) => Err(StressCountParseError::new(s)),
438 }
439 }
440 }
441}
442
443#[derive(Debug)]
447pub struct TestRunner<'a> {
448 inner: TestRunnerInner<'a>,
449 signal_handler: SignalHandler,
450 input_handler: InputHandler,
451}
452
453impl<'a> TestRunner<'a> {
454 pub fn run_id(&self) -> ReportUuid {
456 self.inner.run_id
457 }
458
459 pub fn started_at(&self) -> DateTime<Local> {
461 self.inner.started_at
462 }
463
464 pub fn input_handler_status(&self) -> InputHandlerStatus {
466 self.input_handler.status()
467 }
468
469 pub fn execute<F>(
475 self,
476 mut callback: F,
477 ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
478 where
479 F: FnMut(ReporterEvent<'a>) + Send,
480 {
481 self.try_execute::<Infallible, _>(|event| {
482 callback(event);
483 Ok(())
484 })
485 }
486
487 pub fn try_execute<E, F>(
494 mut self,
495 mut callback: F,
496 ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
497 where
498 F: FnMut(ReporterEvent<'a>) -> Result<(), E> + Send,
499 E: fmt::Debug + Send,
500 {
501 let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
502
503 let mut report_cancel_tx = Some(report_cancel_tx);
507 let mut first_error = None;
508
509 let res = self.inner.execute(
510 &mut self.signal_handler,
511 &mut self.input_handler,
512 report_cancel_rx,
513 |event| {
514 match callback(event) {
515 Ok(()) => {}
516 Err(error) => {
517 if let Some(report_cancel_tx) = report_cancel_tx.take() {
521 let _ = report_cancel_tx.send(());
522 first_error = Some(error);
523 }
524 }
525 }
526 },
527 );
528
529 self.inner.runtime.shutdown_background();
533
534 match (res, first_error) {
535 (Ok(run_stats), None) => Ok(run_stats),
536 (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
537 report_error: Some(report_error),
538 join_errors: Vec::new(),
539 }),
540 (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
541 report_error,
542 join_errors,
543 }),
544 }
545 }
546}
547
548#[derive(Debug)]
549struct TestRunnerInner<'a> {
550 run_id: ReportUuid,
551 started_at: DateTime<Local>,
552 profile: &'a EvaluatableProfile<'a>,
553 test_list: &'a TestList<'a>,
554 test_threads: usize,
555 double_spawn: DoubleSpawnInfo,
556 target_runner: TargetRunner,
557 capture_strategy: CaptureStrategy,
558 force_retries: Option<RetryPolicy>,
559 force_flaky_result: Option<FlakyResult>,
560 cli_args: Vec<String>,
561 max_fail: MaxFail,
562 stress_condition: Option<StressCondition>,
563 interceptor: Interceptor,
564 expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
565 version_env_vars: Option<VersionEnvVars>,
566 runtime: Runtime,
567}
568
569impl<'a> TestRunnerInner<'a> {
570 fn execute<F>(
571 &self,
572 signal_handler: &mut SignalHandler,
573 input_handler: &mut InputHandler,
574 report_cancel_rx: oneshot::Receiver<()>,
575 callback: F,
576 ) -> Result<RunStats, Vec<JoinError>>
577 where
578 F: FnMut(ReporterEvent<'a>) + Send,
579 {
580 let global_timeout = if self.interceptor.should_disable_timeouts() {
584 crate::time::far_future_duration()
585 } else {
586 self.profile.global_timeout(self.test_list.mode()).period
587 };
588
589 let mut dispatcher_cx = DispatcherContext::new(
590 callback,
591 self.run_id,
592 self.profile.name(),
593 self.cli_args.clone(),
594 self.test_list.run_count(),
595 self.max_fail,
596 global_timeout,
597 self.stress_condition.clone(),
598 self.expected_outstanding.clone(),
599 );
600
601 let executor_cx = ExecutorContext::new(
602 self.run_id,
603 self.profile,
604 self.test_list,
605 self.test_threads,
606 self.double_spawn.clone(),
607 self.target_runner.clone(),
608 self.capture_strategy,
609 self.force_retries,
610 self.force_flaky_result,
611 self.interceptor.clone(),
612 self.version_env_vars.clone(),
613 );
614
615 dispatcher_cx.run_started(self.test_list, self.test_threads);
617
618 let _guard = self.runtime.enter();
619
620 let mut report_cancel_rx = std::pin::pin!(report_cancel_rx.fuse());
621
622 if self.stress_condition.is_some() {
623 loop {
624 let progress = dispatcher_cx
625 .stress_progress()
626 .expect("stress_condition is Some => stress progress is Some");
627 if progress.remaining().is_some() {
628 dispatcher_cx.stress_sub_run_started(progress);
629
630 self.do_run(
631 dispatcher_cx.stress_index(),
632 &mut dispatcher_cx,
633 &executor_cx,
634 signal_handler,
635 input_handler,
636 report_cancel_rx.as_mut(),
637 )?;
638
639 dispatcher_cx.stress_sub_run_finished();
640
641 if dispatcher_cx.cancel_reason().is_some() {
642 break;
643 }
644 } else {
645 break;
646 }
647 }
648 } else {
649 self.do_run(
650 None,
651 &mut dispatcher_cx,
652 &executor_cx,
653 signal_handler,
654 input_handler,
655 report_cancel_rx,
656 )?;
657 }
658
659 let run_stats = dispatcher_cx.run_stats();
660 dispatcher_cx.run_finished();
661
662 Ok(run_stats)
663 }
664
665 fn do_run<F>(
666 &self,
667 stress_index: Option<StressIndex>,
668 dispatcher_cx: &mut DispatcherContext<'a, F>,
669 executor_cx: &ExecutorContext<'a>,
670 signal_handler: &mut SignalHandler,
671 input_handler: &mut InputHandler,
672 report_cancel_rx: Pin<&mut Fuse<oneshot::Receiver<()>>>,
673 ) -> Result<(), Vec<JoinError>>
674 where
675 F: FnMut(ReporterEvent<'a>) + Send,
676 {
677 let ((), results) = TokioScope::scope_and_block(move |scope| {
678 let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
679
680 let dispatcher_fut =
682 dispatcher_cx.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
683 scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
684
685 let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
686 let script_resp_tx = resp_tx.clone();
687 let run_scripts_fut = async move {
688 let script_data = executor_cx
691 .run_setup_scripts(stress_index, script_resp_tx)
692 .await;
693 if script_tx.send(script_data).is_err() {
694 debug!("script_tx.send failed, shutting down");
696 }
697 RunnerTaskState::finished_no_children()
698 };
699 scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
700
701 let Some(script_data) = script_rx.blocking_recv() else {
702 debug!("no script data received, shutting down");
704 return;
705 };
706
707 let groups = self
709 .profile
710 .test_group_config()
711 .iter()
712 .map(|(group_name, config)| (group_name, config.max_threads.compute()));
713
714 let setup_script_data = Arc::new(script_data);
715
716 let filter_resp_tx = resp_tx.clone();
717
718 let tests = self.test_list.to_priority_queue(self.profile);
719 let run_tests_fut = futures::stream::iter(tests)
720 .filter_map(move |test| {
721 let filter_resp_tx = filter_resp_tx.clone();
729 async move {
730 if let FilterMatch::Mismatch { reason } =
731 test.instance.test_info.filter_match
732 {
733 let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
735 stress_index,
736 test_instance: test.instance,
737 reason,
738 });
739 return None;
740 }
741 Some(test)
742 }
743 })
744 .map(move |test: TestInstanceWithSettings<'a>| {
745 let threads_required =
746 test.settings.threads_required().compute(self.test_threads);
747 let test_group = match test.settings.test_group() {
748 TestGroup::Global => None,
749 TestGroup::Custom(name) => Some(name.clone()),
750 };
751 let resp_tx = resp_tx.clone();
752 let setup_script_data = setup_script_data.clone();
753
754 let test_instance = test.instance;
755
756 let f = move |cx: FutureQueueContext| {
757 debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
758 async move {
764 let ((), mut ret) = unsafe {
778 TokioScope::scope_and_collect(move |scope| {
779 scope.spawn(executor_cx.run_test_instance(
780 stress_index,
781 test,
782 cx,
783 resp_tx.clone(),
784 setup_script_data,
785 ))
786 })
787 }
788 .await;
789
790 let Some(result) = ret.pop() else {
793 warn!(
794 "no task was started for test instance: {}",
795 test_instance.id()
796 );
797 return None;
798 };
799 result.err()
800 }
801 };
802
803 (threads_required, test_group, f)
804 })
805 .future_queue_grouped(self.test_threads, groups)
808 .filter_map(std::future::ready)
810 .collect::<Vec<_>>()
811 .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
816
817 scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
818 });
819
820 let mut cancelled_count = 0;
827 let join_errors = results
828 .into_iter()
829 .flat_map(|r| {
830 match r {
831 Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
832 Ok(RunnerTaskState::Cancelled) => {
835 cancelled_count += 1;
836 Vec::new()
837 }
838 Err(join_error) => vec![join_error],
839 }
840 })
841 .collect::<Vec<_>>();
842
843 if cancelled_count > 0 {
844 debug!(
845 "{} tasks were cancelled -- this \
846 generally should only happen due to panics",
847 cancelled_count
848 );
849 }
850 if !join_errors.is_empty() {
851 return Err(join_errors);
852 }
853
854 Ok(())
855 }
856}
857
858pub fn configure_handle_inheritance(
871 no_capture: bool,
872) -> Result<(), ConfigureHandleInheritanceError> {
873 super::os::configure_handle_inheritance_impl(no_capture)
874}
875
876const FORCE_RUN_ID_ENV: &str = "__NEXTEST_FORCE_RUN_ID";
878
879fn force_or_new_run_id() -> ReportUuid {
881 if let Ok(id_str) = std::env::var(FORCE_RUN_ID_ENV) {
882 match id_str.parse::<ReportUuid>() {
883 Ok(uuid) => return uuid,
884 Err(err) => {
885 warn!(
886 "{FORCE_RUN_ID_ENV} is set but invalid (expected UUID): {err}, \
887 generating random ID"
888 );
889 }
890 }
891 }
892 ReportUuid::new_v4()
893}
894
895#[cfg(test)]
896mod tests {
897 use super::*;
898 use crate::{config::core::NextestConfig, platform::BuildPlatforms};
899
900 #[test]
901 fn no_capture_settings() {
902 let mut builder = TestRunnerBuilder::default();
904 builder
905 .set_capture_strategy(CaptureStrategy::None)
906 .set_test_threads(TestThreads::Count(20));
907 let test_list = TestList::empty();
908 let config = NextestConfig::default_config("/fake/dir");
909 let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
910 let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
911 let signal_handler = SignalHandlerKind::Noop;
912 let input_handler = InputHandlerKind::Noop;
913 let profile = profile.apply_build_platforms(&build_platforms);
914 let runner = builder
915 .build(
916 &test_list,
917 &profile,
918 vec![],
919 signal_handler,
920 input_handler,
921 DoubleSpawnInfo::disabled(),
922 TargetRunner::empty(),
923 )
924 .unwrap();
925 assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
926 assert_eq!(runner.inner.test_threads, 1, "tests run serially");
927 }
928
929 #[test]
930 fn test_debugger_command_parsing() {
931 let cmd = DebuggerCommand::from_str("gdb --args").unwrap();
933 assert_eq!(cmd.program(), "gdb");
934 assert_eq!(cmd.args(), &["--args"]);
935
936 let cmd = DebuggerCommand::from_str("rust-gdb -ex run --args").unwrap();
937 assert_eq!(cmd.program(), "rust-gdb");
938 assert_eq!(cmd.args(), &["-ex", "run", "--args"]);
939
940 let cmd = DebuggerCommand::from_str(r#"gdb -ex "set print pretty on" --args"#).unwrap();
942 assert_eq!(cmd.program(), "gdb");
943 assert_eq!(cmd.args(), &["-ex", "set print pretty on", "--args"]);
944
945 let err = DebuggerCommand::from_str("").unwrap_err();
947 assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
948
949 let err = DebuggerCommand::from_str(" ").unwrap_err();
951 assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
952 }
953}