1use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
5use crate::{
6 config::{
7 core::EvaluatableProfile,
8 elements::{MaxFail, RetryPolicy, TestGroup, TestThreads},
9 scripts::SetupScriptExecuteData,
10 },
11 double_spawn::DoubleSpawnInfo,
12 errors::{
13 ConfigureHandleInheritanceError, DebuggerCommandParseError, StressCountParseError,
14 TestRunnerBuildError, TestRunnerExecuteErrors, TracerCommandParseError,
15 },
16 input::{InputHandler, InputHandlerKind, InputHandlerStatus},
17 list::{OwnedTestInstanceId, TestInstanceWithSettings, TestList},
18 reporter::events::{ReporterEvent, RunStats, StressIndex},
19 runner::ExecutorEvent,
20 signal::{SignalHandler, SignalHandlerKind},
21 target_runner::TargetRunner,
22 test_output::CaptureStrategy,
23};
24use async_scoped::TokioScope;
25use chrono::{DateTime, Local};
26use future_queue::{FutureQueueContext, StreamExt};
27use futures::{future::Fuse, prelude::*};
28use nextest_metadata::FilterMatch;
29use quick_junit::ReportUuid;
30use semver::Version;
31use std::{
32 collections::BTreeSet, convert::Infallible, fmt, num::NonZero, pin::Pin, str::FromStr,
33 sync::Arc, time::Duration,
34};
35use tokio::{
36 runtime::Runtime,
37 sync::{mpsc::unbounded_channel, oneshot},
38 task::JoinError,
39};
40use tracing::{debug, warn};
41
42#[derive(Clone, Debug)]
44pub struct DebuggerCommand {
45 program: String,
46 args: Vec<String>,
47}
48
49impl DebuggerCommand {
50 pub fn program(&self) -> &str {
52 &self.program
54 }
55
56 pub fn args(&self) -> &[String] {
58 &self.args
59 }
60}
61
62impl FromStr for DebuggerCommand {
63 type Err = DebuggerCommandParseError;
64
65 fn from_str(command: &str) -> Result<Self, Self::Err> {
66 let mut parts =
67 shell_words::split(command).map_err(DebuggerCommandParseError::ShellWordsParse)?;
68 if parts.is_empty() {
69 return Err(DebuggerCommandParseError::EmptyCommand);
70 }
71 let program = parts.remove(0);
72 Ok(Self {
73 program,
74 args: parts,
75 })
76 }
77}
78
79#[derive(Clone, Debug)]
81pub struct TracerCommand {
82 program: String,
83 args: Vec<String>,
84}
85
86impl TracerCommand {
87 pub fn program(&self) -> &str {
89 &self.program
90 }
91
92 pub fn args(&self) -> &[String] {
94 &self.args
95 }
96}
97
98impl FromStr for TracerCommand {
99 type Err = TracerCommandParseError;
100
101 fn from_str(command: &str) -> Result<Self, Self::Err> {
102 let mut parts =
103 shell_words::split(command).map_err(TracerCommandParseError::ShellWordsParse)?;
104 if parts.is_empty() {
105 return Err(TracerCommandParseError::EmptyCommand);
106 }
107 let program = parts.remove(0);
108 Ok(Self {
109 program,
110 args: parts,
111 })
112 }
113}
114
115#[derive(Clone, Debug, Default)]
117pub enum Interceptor {
118 #[default]
120 None,
121
122 Debugger(DebuggerCommand),
124
125 Tracer(TracerCommand),
127}
128
129impl Interceptor {
130 pub fn should_disable_timeouts(&self) -> bool {
134 match self {
135 Interceptor::None => false,
136 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
137 }
138 }
139
140 pub fn should_passthrough_stdin(&self) -> bool {
144 match self {
145 Interceptor::None | Interceptor::Tracer(_) => false,
146 Interceptor::Debugger(_) => true,
147 }
148 }
149
150 pub fn should_create_process_group(&self) -> bool {
155 match self {
156 Interceptor::None | Interceptor::Tracer(_) => true,
157 Interceptor::Debugger(_) => false,
158 }
159 }
160
161 pub fn should_skip_leak_detection(&self) -> bool {
165 match self {
166 Interceptor::None => false,
167 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
168 }
169 }
170
171 pub fn should_show_wrapper_command(&self) -> bool {
175 match self {
176 Interceptor::None => false,
177 Interceptor::Debugger(_) | Interceptor::Tracer(_) => true,
178 }
179 }
180
181 pub fn should_send_sigtstp(&self) -> bool {
188 match self {
189 Interceptor::None | Interceptor::Tracer(_) => true,
190 Interceptor::Debugger(_) => false,
191 }
192 }
193}
194
195#[derive(Clone, Debug)]
200pub struct VersionEnvVars {
201 pub current_version: Version,
203
204 pub required_version: Option<Version>,
206
207 pub recommended_version: Option<Version>,
209}
210
211impl VersionEnvVars {
212 pub(super) fn apply_env(&self, cmd: &mut std::process::Command) {
214 cmd.env("NEXTEST_VERSION", self.current_version.to_string());
215 cmd.env(
216 "NEXTEST_REQUIRED_VERSION",
217 match &self.required_version {
218 Some(v) => v.to_string(),
219 None => "none".to_owned(),
220 },
221 );
222 cmd.env(
223 "NEXTEST_RECOMMENDED_VERSION",
224 match &self.recommended_version {
225 Some(v) => v.to_string(),
226 None => "none".to_owned(),
227 },
228 );
229 }
230}
231
232#[derive(Copy, Clone, Debug)]
234pub(super) enum ChildPid {
235 Process(#[cfg_attr(not(unix), expect(unused))] u32),
237
238 #[cfg(unix)]
240 ProcessGroup(u32),
241}
242
243impl ChildPid {
244 #[cfg(unix)]
251 pub(super) fn for_kill(self) -> i32 {
252 match self {
253 ChildPid::Process(pid) => pid as i32,
254 ChildPid::ProcessGroup(pid) => -(pid as i32),
255 }
256 }
257}
258
259#[derive(Debug, Default)]
261pub struct TestRunnerBuilder {
262 capture_strategy: CaptureStrategy,
263 retries: Option<RetryPolicy>,
264 max_fail: Option<MaxFail>,
265 test_threads: Option<TestThreads>,
266 stress_condition: Option<StressCondition>,
267 interceptor: Interceptor,
268 expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
269 version_env_vars: Option<VersionEnvVars>,
270}
271
272impl TestRunnerBuilder {
273 pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
284 self.capture_strategy = strategy;
285 self
286 }
287
288 pub fn set_retries(&mut self, retries: RetryPolicy) -> &mut Self {
290 self.retries = Some(retries);
291 self
292 }
293
294 pub fn set_max_fail(&mut self, max_fail: MaxFail) -> &mut Self {
296 self.max_fail = Some(max_fail);
297 self
298 }
299
300 pub fn set_test_threads(&mut self, test_threads: TestThreads) -> &mut Self {
302 self.test_threads = Some(test_threads);
303 self
304 }
305
306 pub fn set_stress_condition(&mut self, stress_condition: StressCondition) -> &mut Self {
308 self.stress_condition = Some(stress_condition);
309 self
310 }
311
312 pub fn set_interceptor(&mut self, interceptor: Interceptor) -> &mut Self {
314 self.interceptor = interceptor;
315 self
316 }
317
318 pub fn set_expected_outstanding(
324 &mut self,
325 expected: BTreeSet<OwnedTestInstanceId>,
326 ) -> &mut Self {
327 self.expected_outstanding = Some(expected);
328 self
329 }
330
331 pub fn set_version_env_vars(&mut self, version_env_vars: VersionEnvVars) -> &mut Self {
333 self.version_env_vars = Some(version_env_vars);
334 self
335 }
336
337 #[expect(clippy::too_many_arguments)]
339 pub fn build<'a>(
340 self,
341 test_list: &'a TestList,
342 profile: &'a EvaluatableProfile<'a>,
343 cli_args: Vec<String>,
344 signal_handler: SignalHandlerKind,
345 input_handler: InputHandlerKind,
346 double_spawn: DoubleSpawnInfo,
347 target_runner: TargetRunner,
348 ) -> Result<TestRunner<'a>, TestRunnerBuildError> {
349 let test_threads = match self.capture_strategy {
350 CaptureStrategy::None => 1,
351 CaptureStrategy::Combined | CaptureStrategy::Split => self
352 .test_threads
353 .unwrap_or_else(|| profile.test_threads())
354 .compute(),
355 };
356 let max_fail = self.max_fail.unwrap_or_else(|| profile.max_fail());
357
358 let runtime = tokio::runtime::Builder::new_multi_thread()
359 .enable_all()
360 .thread_name("nextest-runner-worker")
361 .build()
362 .map_err(TestRunnerBuildError::TokioRuntimeCreate)?;
363 let _guard = runtime.enter();
364
365 let signal_handler = signal_handler.build()?;
367
368 let input_handler = input_handler.build();
369
370 Ok(TestRunner {
371 inner: TestRunnerInner {
372 run_id: force_or_new_run_id(),
373 started_at: Local::now(),
374 profile,
375 test_list,
376 test_threads,
377 double_spawn,
378 target_runner,
379 capture_strategy: self.capture_strategy,
380 force_retries: self.retries,
381 cli_args,
382 max_fail,
383 stress_condition: self.stress_condition,
384 interceptor: self.interceptor,
385 expected_outstanding: self.expected_outstanding,
386 version_env_vars: self.version_env_vars,
387 runtime,
388 },
389 signal_handler,
390 input_handler,
391 })
392 }
393}
394
395#[derive(Clone, Debug)]
397pub enum StressCondition {
398 Count(StressCount),
400
401 Duration(Duration),
403}
404
405#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
407#[serde(tag = "kind", rename_all = "kebab-case")]
408#[cfg_attr(test, derive(test_strategy::Arbitrary))]
409pub enum StressCount {
410 Count {
412 count: NonZero<u32>,
414 },
415
416 Infinite,
418}
419
420impl FromStr for StressCount {
421 type Err = StressCountParseError;
422
423 fn from_str(s: &str) -> Result<Self, Self::Err> {
424 if s == "infinite" {
425 Ok(StressCount::Infinite)
426 } else {
427 match s.parse() {
428 Ok(count) => Ok(StressCount::Count { count }),
429 Err(_) => Err(StressCountParseError::new(s)),
430 }
431 }
432 }
433}
434
435#[derive(Debug)]
439pub struct TestRunner<'a> {
440 inner: TestRunnerInner<'a>,
441 signal_handler: SignalHandler,
442 input_handler: InputHandler,
443}
444
445impl<'a> TestRunner<'a> {
446 pub fn run_id(&self) -> ReportUuid {
448 self.inner.run_id
449 }
450
451 pub fn started_at(&self) -> DateTime<Local> {
453 self.inner.started_at
454 }
455
456 pub fn input_handler_status(&self) -> InputHandlerStatus {
458 self.input_handler.status()
459 }
460
461 pub fn execute<F>(
467 self,
468 mut callback: F,
469 ) -> Result<RunStats, TestRunnerExecuteErrors<Infallible>>
470 where
471 F: FnMut(ReporterEvent<'a>) + Send,
472 {
473 self.try_execute::<Infallible, _>(|event| {
474 callback(event);
475 Ok(())
476 })
477 }
478
479 pub fn try_execute<E, F>(
486 mut self,
487 mut callback: F,
488 ) -> Result<RunStats, TestRunnerExecuteErrors<E>>
489 where
490 F: FnMut(ReporterEvent<'a>) -> Result<(), E> + Send,
491 E: fmt::Debug + Send,
492 {
493 let (report_cancel_tx, report_cancel_rx) = oneshot::channel();
494
495 let mut report_cancel_tx = Some(report_cancel_tx);
499 let mut first_error = None;
500
501 let res = self.inner.execute(
502 &mut self.signal_handler,
503 &mut self.input_handler,
504 report_cancel_rx,
505 |event| {
506 match callback(event) {
507 Ok(()) => {}
508 Err(error) => {
509 if let Some(report_cancel_tx) = report_cancel_tx.take() {
513 let _ = report_cancel_tx.send(());
514 first_error = Some(error);
515 }
516 }
517 }
518 },
519 );
520
521 self.inner.runtime.shutdown_background();
525
526 match (res, first_error) {
527 (Ok(run_stats), None) => Ok(run_stats),
528 (Ok(_), Some(report_error)) => Err(TestRunnerExecuteErrors {
529 report_error: Some(report_error),
530 join_errors: Vec::new(),
531 }),
532 (Err(join_errors), report_error) => Err(TestRunnerExecuteErrors {
533 report_error,
534 join_errors,
535 }),
536 }
537 }
538}
539
540#[derive(Debug)]
541struct TestRunnerInner<'a> {
542 run_id: ReportUuid,
543 started_at: DateTime<Local>,
544 profile: &'a EvaluatableProfile<'a>,
545 test_list: &'a TestList<'a>,
546 test_threads: usize,
547 double_spawn: DoubleSpawnInfo,
548 target_runner: TargetRunner,
549 capture_strategy: CaptureStrategy,
550 force_retries: Option<RetryPolicy>,
551 cli_args: Vec<String>,
552 max_fail: MaxFail,
553 stress_condition: Option<StressCondition>,
554 interceptor: Interceptor,
555 expected_outstanding: Option<BTreeSet<OwnedTestInstanceId>>,
556 version_env_vars: Option<VersionEnvVars>,
557 runtime: Runtime,
558}
559
560impl<'a> TestRunnerInner<'a> {
561 fn execute<F>(
562 &self,
563 signal_handler: &mut SignalHandler,
564 input_handler: &mut InputHandler,
565 report_cancel_rx: oneshot::Receiver<()>,
566 callback: F,
567 ) -> Result<RunStats, Vec<JoinError>>
568 where
569 F: FnMut(ReporterEvent<'a>) + Send,
570 {
571 let global_timeout = if self.interceptor.should_disable_timeouts() {
575 crate::time::far_future_duration()
576 } else {
577 self.profile.global_timeout(self.test_list.mode()).period
578 };
579
580 let mut dispatcher_cx = DispatcherContext::new(
581 callback,
582 self.run_id,
583 self.profile.name(),
584 self.cli_args.clone(),
585 self.test_list.run_count(),
586 self.max_fail,
587 global_timeout,
588 self.stress_condition.clone(),
589 self.expected_outstanding.clone(),
590 );
591
592 let executor_cx = ExecutorContext::new(
593 self.run_id,
594 self.profile,
595 self.test_list,
596 self.test_threads,
597 self.double_spawn.clone(),
598 self.target_runner.clone(),
599 self.capture_strategy,
600 self.force_retries,
601 self.interceptor.clone(),
602 self.version_env_vars.clone(),
603 );
604
605 dispatcher_cx.run_started(self.test_list, self.test_threads);
607
608 let _guard = self.runtime.enter();
609
610 let mut report_cancel_rx = std::pin::pin!(report_cancel_rx.fuse());
611
612 if self.stress_condition.is_some() {
613 loop {
614 let progress = dispatcher_cx
615 .stress_progress()
616 .expect("stress_condition is Some => stress progress is Some");
617 if progress.remaining().is_some() {
618 dispatcher_cx.stress_sub_run_started(progress);
619
620 self.do_run(
621 dispatcher_cx.stress_index(),
622 &mut dispatcher_cx,
623 &executor_cx,
624 signal_handler,
625 input_handler,
626 report_cancel_rx.as_mut(),
627 )?;
628
629 dispatcher_cx.stress_sub_run_finished();
630
631 if dispatcher_cx.cancel_reason().is_some() {
632 break;
633 }
634 } else {
635 break;
636 }
637 }
638 } else {
639 self.do_run(
640 None,
641 &mut dispatcher_cx,
642 &executor_cx,
643 signal_handler,
644 input_handler,
645 report_cancel_rx,
646 )?;
647 }
648
649 let run_stats = dispatcher_cx.run_stats();
650 dispatcher_cx.run_finished();
651
652 Ok(run_stats)
653 }
654
655 fn do_run<F>(
656 &self,
657 stress_index: Option<StressIndex>,
658 dispatcher_cx: &mut DispatcherContext<'a, F>,
659 executor_cx: &ExecutorContext<'a>,
660 signal_handler: &mut SignalHandler,
661 input_handler: &mut InputHandler,
662 report_cancel_rx: Pin<&mut Fuse<oneshot::Receiver<()>>>,
663 ) -> Result<(), Vec<JoinError>>
664 where
665 F: FnMut(ReporterEvent<'a>) + Send,
666 {
667 let ((), results) = TokioScope::scope_and_block(move |scope| {
668 let (resp_tx, resp_rx) = unbounded_channel::<ExecutorEvent<'a>>();
669
670 let dispatcher_fut =
672 dispatcher_cx.run(resp_rx, signal_handler, input_handler, report_cancel_rx);
673 scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);
674
675 let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
676 let script_resp_tx = resp_tx.clone();
677 let run_scripts_fut = async move {
678 let script_data = executor_cx
681 .run_setup_scripts(stress_index, script_resp_tx)
682 .await;
683 if script_tx.send(script_data).is_err() {
684 debug!("script_tx.send failed, shutting down");
686 }
687 RunnerTaskState::finished_no_children()
688 };
689 scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);
690
691 let Some(script_data) = script_rx.blocking_recv() else {
692 debug!("no script data received, shutting down");
694 return;
695 };
696
697 let groups = self
699 .profile
700 .test_group_config()
701 .iter()
702 .map(|(group_name, config)| (group_name, config.max_threads.compute()));
703
704 let setup_script_data = Arc::new(script_data);
705
706 let filter_resp_tx = resp_tx.clone();
707
708 let tests = self.test_list.to_priority_queue(self.profile);
709 let run_tests_fut = futures::stream::iter(tests)
710 .filter_map(move |test| {
711 let filter_resp_tx = filter_resp_tx.clone();
719 async move {
720 if let FilterMatch::Mismatch { reason } =
721 test.instance.test_info.filter_match
722 {
723 let _ = filter_resp_tx.send(ExecutorEvent::Skipped {
725 stress_index,
726 test_instance: test.instance,
727 reason,
728 });
729 return None;
730 }
731 Some(test)
732 }
733 })
734 .map(move |test: TestInstanceWithSettings<'a>| {
735 let threads_required =
736 test.settings.threads_required().compute(self.test_threads);
737 let test_group = match test.settings.test_group() {
738 TestGroup::Global => None,
739 TestGroup::Custom(name) => Some(name.clone()),
740 };
741 let resp_tx = resp_tx.clone();
742 let setup_script_data = setup_script_data.clone();
743
744 let test_instance = test.instance;
745
746 let f = move |cx: FutureQueueContext| {
747 debug!("running test instance: {}; cx: {cx:?}", test_instance.id());
748 async move {
754 let ((), mut ret) = unsafe {
768 TokioScope::scope_and_collect(move |scope| {
769 scope.spawn(executor_cx.run_test_instance(
770 stress_index,
771 test,
772 cx,
773 resp_tx.clone(),
774 setup_script_data,
775 ))
776 })
777 }
778 .await;
779
780 let Some(result) = ret.pop() else {
783 warn!(
784 "no task was started for test instance: {}",
785 test_instance.id()
786 );
787 return None;
788 };
789 result.err()
790 }
791 };
792
793 (threads_required, test_group, f)
794 })
795 .future_queue_grouped(self.test_threads, groups)
798 .filter_map(std::future::ready)
800 .collect::<Vec<_>>()
801 .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });
806
807 scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
808 });
809
810 let mut cancelled_count = 0;
817 let join_errors = results
818 .into_iter()
819 .flat_map(|r| {
820 match r {
821 Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
822 Ok(RunnerTaskState::Cancelled) => {
825 cancelled_count += 1;
826 Vec::new()
827 }
828 Err(join_error) => vec![join_error],
829 }
830 })
831 .collect::<Vec<_>>();
832
833 if cancelled_count > 0 {
834 debug!(
835 "{} tasks were cancelled -- this \
836 generally should only happen due to panics",
837 cancelled_count
838 );
839 }
840 if !join_errors.is_empty() {
841 return Err(join_errors);
842 }
843
844 Ok(())
845 }
846}
847
848pub fn configure_handle_inheritance(
861 no_capture: bool,
862) -> Result<(), ConfigureHandleInheritanceError> {
863 super::os::configure_handle_inheritance_impl(no_capture)
864}
865
866const FORCE_RUN_ID_ENV: &str = "__NEXTEST_FORCE_RUN_ID";
868
869fn force_or_new_run_id() -> ReportUuid {
871 if let Ok(id_str) = std::env::var(FORCE_RUN_ID_ENV) {
872 match id_str.parse::<ReportUuid>() {
873 Ok(uuid) => return uuid,
874 Err(err) => {
875 warn!(
876 "{FORCE_RUN_ID_ENV} is set but invalid (expected UUID): {err}, \
877 generating random ID"
878 );
879 }
880 }
881 }
882 ReportUuid::new_v4()
883}
884
885#[cfg(test)]
886mod tests {
887 use super::*;
888 use crate::{config::core::NextestConfig, platform::BuildPlatforms};
889
890 #[test]
891 fn no_capture_settings() {
892 let mut builder = TestRunnerBuilder::default();
894 builder
895 .set_capture_strategy(CaptureStrategy::None)
896 .set_test_threads(TestThreads::Count(20));
897 let test_list = TestList::empty();
898 let config = NextestConfig::default_config("/fake/dir");
899 let profile = config.profile(NextestConfig::DEFAULT_PROFILE).unwrap();
900 let build_platforms = BuildPlatforms::new_with_no_target().unwrap();
901 let signal_handler = SignalHandlerKind::Noop;
902 let input_handler = InputHandlerKind::Noop;
903 let profile = profile.apply_build_platforms(&build_platforms);
904 let runner = builder
905 .build(
906 &test_list,
907 &profile,
908 vec![],
909 signal_handler,
910 input_handler,
911 DoubleSpawnInfo::disabled(),
912 TargetRunner::empty(),
913 )
914 .unwrap();
915 assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
916 assert_eq!(runner.inner.test_threads, 1, "tests run serially");
917 }
918
919 #[test]
920 fn test_debugger_command_parsing() {
921 let cmd = DebuggerCommand::from_str("gdb --args").unwrap();
923 assert_eq!(cmd.program(), "gdb");
924 assert_eq!(cmd.args(), &["--args"]);
925
926 let cmd = DebuggerCommand::from_str("rust-gdb -ex run --args").unwrap();
927 assert_eq!(cmd.program(), "rust-gdb");
928 assert_eq!(cmd.args(), &["-ex", "run", "--args"]);
929
930 let cmd = DebuggerCommand::from_str(r#"gdb -ex "set print pretty on" --args"#).unwrap();
932 assert_eq!(cmd.program(), "gdb");
933 assert_eq!(cmd.args(), &["-ex", "set print pretty on", "--args"]);
934
935 let err = DebuggerCommand::from_str("").unwrap_err();
937 assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
938
939 let err = DebuggerCommand::from_str(" ").unwrap_err();
941 assert!(matches!(err, DebuggerCommandParseError::EmptyCommand));
942 }
943}