1use super::HandleSignalResult;
15use crate::{
16 config::{
17 core::EvaluatableProfile,
18 elements::{LeakTimeout, LeakTimeoutResult, RetryPolicy, SlowTimeout, TestGroup},
19 overrides::TestSettings,
20 scripts::{ScriptId, SetupScriptCommand, SetupScriptConfig, SetupScriptExecuteData},
21 },
22 double_spawn::DoubleSpawnInfo,
23 errors::{ChildError, ChildFdError, ChildStartError, ErrorList},
24 list::{TestExecuteContext, TestInstance, TestInstanceWithSettings, TestList},
25 reporter::events::{
26 ExecutionResult, FailureStatus, InfoResponse, RetryData, SetupScriptInfoResponse,
27 StressIndex, TestInfoResponse, UnitKind, UnitState,
28 },
29 runner::{
30 ExecutorEvent, InternalExecuteStatus, InternalSetupScriptExecuteStatus,
31 InternalTerminateReason, RunUnitQuery, RunUnitRequest, SignalRequest, UnitExecuteStatus,
32 parse_env_file,
33 },
34 target_runner::TargetRunner,
35 test_command::{ChildAccumulator, ChildFds},
36 test_output::{CaptureStrategy, ChildExecutionOutput, ChildOutput, ChildSplitOutput},
37 time::{PausableSleep, StopwatchStart},
38};
39use future_queue::FutureQueueContext;
40use nextest_metadata::FilterMatch;
41use quick_junit::ReportUuid;
42use rand::{Rng, distr::OpenClosed01};
43use std::{
44 fmt,
45 num::NonZeroUsize,
46 pin::Pin,
47 process::{ExitStatus, Stdio},
48 sync::Arc,
49 time::Duration,
50};
51use tokio::{
52 process::Child,
53 sync::{
54 mpsc::{UnboundedReceiver, UnboundedSender},
55 oneshot,
56 },
57};
58use tracing::{debug, instrument};
59
60#[derive(Debug)]
61pub(super) struct ExecutorContext<'a> {
62 run_id: ReportUuid,
63 profile: &'a EvaluatableProfile<'a>,
64 test_list: &'a TestList<'a>,
65 double_spawn: DoubleSpawnInfo,
66 target_runner: TargetRunner,
67 capture_strategy: CaptureStrategy,
68 force_retries: Option<RetryPolicy>,
70}
71
72impl<'a> ExecutorContext<'a> {
73 pub(super) fn new(
74 run_id: ReportUuid,
75 profile: &'a EvaluatableProfile<'a>,
76 test_list: &'a TestList<'a>,
77 double_spawn: DoubleSpawnInfo,
78 target_runner: TargetRunner,
79 capture_strategy: CaptureStrategy,
80 force_retries: Option<RetryPolicy>,
81 ) -> Self {
82 Self {
83 run_id,
84 profile,
85 test_list,
86 double_spawn,
87 target_runner,
88 capture_strategy,
89 force_retries,
90 }
91 }
92
93 pub(super) async fn run_setup_scripts(
95 &self,
96 stress_index: Option<StressIndex>,
97 resp_tx: UnboundedSender<ExecutorEvent<'a>>,
98 ) -> SetupScriptExecuteData<'a> {
99 let setup_scripts = self.profile.setup_scripts(self.test_list);
100 let total = setup_scripts.len();
101 debug!("running {} setup scripts", total);
102
103 let mut setup_script_data = SetupScriptExecuteData::new();
104
105 for (index, script) in setup_scripts.into_iter().enumerate() {
107 let this_resp_tx = resp_tx.clone();
108
109 let script_id = script.id.clone();
110 let config = script.config;
111 let program = config.command.program(
112 self.test_list.workspace_root(),
113 &self.test_list.rust_build_meta().target_directory,
114 );
115
116 let script_fut = async move {
117 let (req_rx_tx, req_rx_rx) = oneshot::channel();
118 let _ = this_resp_tx.send(ExecutorEvent::SetupScriptStarted {
119 stress_index,
120 script_id: script_id.clone(),
121 config,
122 program: program.clone(),
123 index,
124 total,
125 req_rx_tx,
126 });
127 let mut req_rx = match req_rx_rx.await {
128 Ok(req_rx) => req_rx,
129 Err(_) => {
130 return None;
133 }
134 };
135
136 let packet = SetupScriptPacket {
137 stress_index,
138 script_id: script_id.clone(),
139 config,
140 program: program.clone(),
141 };
142
143 let status = self
144 .run_setup_script(packet, &this_resp_tx, &mut req_rx)
145 .await;
146
147 drain_req_rx(req_rx, UnitExecuteStatus::SetupScript(&status));
150
151 let status = status.into_external();
152 let env_map = status.env_map.clone();
153
154 let _ = this_resp_tx.send(ExecutorEvent::SetupScriptFinished {
155 stress_index,
156 script_id,
157 config,
158 program,
159 index,
160 total,
161 status,
162 });
163
164 env_map.map(|env_map| (script, env_map))
165 };
166
167 if let Some((script, env_map)) = script_fut.await {
169 setup_script_data.add_script(script, env_map);
170 }
171 }
172
173 setup_script_data
174 }
175
176 pub(super) async fn run_test_instance(
178 &self,
179 stress_index: Option<StressIndex>,
180 test: TestInstanceWithSettings<'a>,
181 cx: FutureQueueContext,
182 resp_tx: UnboundedSender<ExecutorEvent<'a>>,
183 setup_script_data: Arc<SetupScriptExecuteData<'a>>,
184 ) {
185 debug!(test_name = test.instance.name, "running test");
186
187 let settings = Arc::new(test.settings);
188
189 let retry_policy = self.force_retries.unwrap_or_else(|| settings.retries());
190 let total_attempts = retry_policy.count() + 1;
191 let mut backoff_iter = BackoffIter::new(retry_policy);
192
193 if let FilterMatch::Mismatch { reason } = test.instance.test_info.filter_match {
194 debug_assert!(
195 false,
196 "this test should already have been skipped in a filter step"
197 );
198 let _ = resp_tx.send(ExecutorEvent::Skipped {
200 stress_index,
201 test_instance: test.instance,
202 reason,
203 });
204 return;
205 }
206
207 let (req_rx_tx, req_rx_rx) = oneshot::channel();
208
209 _ = resp_tx.send(ExecutorEvent::Started {
212 stress_index,
213 test_instance: test.instance,
214 req_rx_tx,
215 });
216 let mut req_rx = match req_rx_rx.await {
217 Ok(rx) => rx,
218 Err(_) => {
219 return;
222 }
223 };
224
225 let mut attempt = 0;
226 let mut delay = Duration::ZERO;
227 let last_run_status = loop {
228 attempt += 1;
229 let retry_data = RetryData {
230 attempt,
231 total_attempts,
232 };
233
234 if retry_data.attempt > 1 {
235 let (tx, rx) = oneshot::channel();
239 _ = resp_tx.send(ExecutorEvent::RetryStarted {
240 stress_index,
241 test_instance: test.instance,
242 retry_data,
243 tx,
244 });
245
246 match rx.await {
247 Ok(()) => {}
248 Err(_) => {
249 return;
252 }
253 }
254 }
255
256 let packet = TestPacket {
260 stress_index,
261 test_instance: test.instance,
262 cx: cx.clone(),
263 retry_data,
264 settings: settings.clone(),
265 setup_script_data: setup_script_data.clone(),
266 delay_before_start: delay,
267 };
268
269 let run_status = self.run_test(packet.clone(), &resp_tx, &mut req_rx).await;
270
271 if run_status.result.is_success() {
272 break run_status;
274 } else if retry_data.attempt < retry_data.total_attempts {
275 delay = backoff_iter
277 .next()
278 .expect("backoff delay must be non-empty");
279
280 let run_status = run_status.into_external();
281 let previous_result = run_status.result;
282 let previous_slow = run_status.is_slow;
283
284 let _ = resp_tx.send(ExecutorEvent::AttemptFailedWillRetry {
285 stress_index,
286 test_instance: test.instance,
287 failure_output: settings.failure_output(),
288 run_status,
289 delay_before_next_attempt: delay,
290 });
291
292 handle_delay_between_attempts(
293 &packet,
294 previous_result,
295 previous_slow,
296 delay,
297 &mut req_rx,
298 )
299 .await;
300 } else {
301 break run_status;
303 }
304 };
305
306 drain_req_rx(req_rx, UnitExecuteStatus::Test(&last_run_status));
307
308 let last_run_status = last_run_status.into_external();
313 let _ = resp_tx.send(ExecutorEvent::Finished {
314 stress_index,
315 test_instance: test.instance,
316 success_output: settings.success_output(),
317 failure_output: settings.failure_output(),
318 junit_store_success_output: settings.junit_store_success_output(),
319 junit_store_failure_output: settings.junit_store_failure_output(),
320 last_run_status,
321 });
322 }
323
324 #[instrument(level = "debug", skip(self, resp_tx, req_rx))]
330 async fn run_setup_script(
331 &self,
332 script: SetupScriptPacket<'a>,
333 resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
334 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
335 ) -> InternalSetupScriptExecuteStatus<'a> {
336 let mut stopwatch = crate::time::stopwatch();
337
338 match self
339 .run_setup_script_inner(script.clone(), &mut stopwatch, resp_tx, req_rx)
340 .await
341 {
342 Ok(status) => status,
343 Err(error) => InternalSetupScriptExecuteStatus {
344 script,
345 slow_after: None,
346 output: ChildExecutionOutput::StartError(error),
347 result: ExecutionResult::ExecFail,
348 stopwatch_end: stopwatch.snapshot(),
349 env_map: None,
350 },
351 }
352 }
353
354 #[instrument(level = "debug", skip(self, resp_tx, req_rx))]
355 async fn run_setup_script_inner(
356 &self,
357 script: SetupScriptPacket<'a>,
358 stopwatch: &mut StopwatchStart,
359 resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
360 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
361 ) -> Result<InternalSetupScriptExecuteStatus<'a>, ChildStartError> {
362 let mut cmd =
363 script.make_command(self.profile.name(), &self.double_spawn, self.test_list)?;
364 let command_mut = cmd.command_mut();
365
366 command_mut.env("NEXTEST_RUN_ID", format!("{}", self.run_id));
367 command_mut.stdin(Stdio::null());
368 super::os::set_process_group(command_mut);
369
370 let job = super::os::Job::create().ok();
373
374 if self.capture_strategy != CaptureStrategy::None {
376 if script.config.capture_stdout {
377 command_mut.stdout(std::process::Stdio::piped());
378 }
379 if script.config.capture_stderr {
380 command_mut.stderr(std::process::Stdio::piped());
381 }
382 }
383
384 let (mut child, env_path) = cmd
385 .spawn()
386 .map_err(|error| ChildStartError::Spawn(Arc::new(error)))?;
387 let child_pid = child
388 .id()
389 .expect("child has never been polled so must return a PID");
390
391 let _ = super::os::assign_process_to_job(&child, job.as_ref());
394
395 let mut status: Option<ExecutionResult> = None;
396 let slow_timeout = script
401 .config
402 .slow_timeout
403 .unwrap_or(SlowTimeout::VERY_LARGE);
404 let leak_timeout = script.config.leak_timeout.unwrap_or_default();
405
406 let mut interval_sleep = std::pin::pin!(crate::time::pausable_sleep(slow_timeout.period));
407
408 let mut timeout_hit = 0;
409
410 let child_fds = ChildFds::new_split(child.stdout.take(), child.stderr.take());
411 let mut child_acc = ChildAccumulator::new(child_fds);
412
413 let mut cx = UnitContext {
414 packet: UnitPacket::SetupScript(script.clone()),
415 slow_after: None,
416 };
417
418 let (res, leaked) = {
419 let res = loop {
420 tokio::select! {
421 () = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
422 res = child.wait() => {
423 break res;
425 }
426 _ = &mut interval_sleep, if status.is_none() => {
427 cx.slow_after = Some(slow_timeout.period);
429
430 timeout_hit += 1;
431 let will_terminate = if let Some(terminate_after) = slow_timeout.terminate_after {
432 NonZeroUsize::new(timeout_hit as usize)
433 .expect("timeout_hit was just incremented")
434 >= terminate_after
435 } else {
436 false
437 };
438
439 if !slow_timeout.grace_period.is_zero() {
440 let _ = resp_tx.send(script.slow_event(
441 timeout_hit * slow_timeout.period,
444 will_terminate.then_some(slow_timeout.grace_period),
445 ));
446 }
447
448 if will_terminate {
449 _ = super::os::terminate_child(
457 &cx,
458 &mut child,
459 &mut child_acc,
460 InternalTerminateReason::Timeout,
461 stopwatch,
462 req_rx,
463 job.as_ref(),
464 slow_timeout.grace_period,
465 ).await;
466 status = Some(ExecutionResult::Timeout);
467 if slow_timeout.grace_period.is_zero() {
468 break child.wait().await;
469 }
470 } else {
472 interval_sleep.as_mut().reset_last_duration();
473 }
474 }
475 recv = req_rx.recv() => {
476 let req = recv.expect("a RecvError should never happen here");
480
481 match req {
482 RunUnitRequest::Signal(req) => {
483 #[cfg_attr(not(windows), expect(unused_variables))]
484 let res = handle_signal_request(
485 &cx,
486 &mut child,
487 &mut child_acc,
488 req,
489 stopwatch,
490 interval_sleep.as_mut(),
491 req_rx,
492 job.as_ref(),
493 slow_timeout.grace_period
494 ).await;
495
496 #[cfg(windows)]
509 {
510 if matches!(
511 res,
512 HandleSignalResult::Terminated(super::TerminateChildResult::Killed)
513 ) {
514 status = Some(ExecutionResult::Fail {
515 failure_status: FailureStatus::Abort(
516 crate::reporter::events::AbortStatus::JobObject,
517 ),
518 leaked: false,
519 });
520 }
521 }
522 }
523 RunUnitRequest::OtherCancel => {
524 }
527 RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => {
528 _ = sender.send(script.info_response(
529 UnitState::Running {
530 pid: child_pid,
531 time_taken: stopwatch.snapshot().active,
532 slow_after: cx.slow_after,
533 },
534 child_acc.snapshot_in_progress(UnitKind::WAITING_ON_SCRIPT_MESSAGE),
535 ));
536 }
537 }
538 }
539 }
540 };
541
542 let tentative_status = status.or_else(|| {
544 res.as_ref().ok().map(|res| {
545 create_execution_result(*res, &child_acc.errors, false, LeakTimeoutResult::Pass)
546 })
547 });
548
549 let leaked = detect_fd_leaks(
550 &cx,
551 child_pid,
552 &mut child_acc,
553 tentative_status,
554 leak_timeout,
555 stopwatch,
556 req_rx,
557 )
558 .await;
559
560 (res, leaked)
561 };
562
563 let exit_status = match res {
564 Ok(exit_status) => Some(exit_status),
565 Err(err) => {
566 child_acc.errors.push(ChildFdError::Wait(Arc::new(err)));
567 None
568 }
569 };
570
571 let exit_status = exit_status.expect("None always results in early return");
572
573 let exec_result = status.unwrap_or_else(|| {
574 create_execution_result(exit_status, &child_acc.errors, leaked, leak_timeout.result)
575 });
576
577 let mut errors: Vec<_> = child_acc.errors.into_iter().map(ChildError::from).collect();
579 let env_map = if exec_result.is_success() {
580 match parse_env_file(&env_path).await {
581 Ok(env_map) => Some(env_map),
582 Err(error) => {
583 errors.push(ChildError::SetupScriptOutput(error));
584 None
585 }
586 }
587 } else {
588 None
589 };
590
591 Ok(InternalSetupScriptExecuteStatus {
592 script,
593 slow_after: cx.slow_after,
594 output: ChildExecutionOutput::Output {
595 result: Some(exec_result),
596 output: child_acc.output.freeze(),
597 errors: ErrorList::new(UnitKind::WAITING_ON_SCRIPT_MESSAGE, errors),
598 },
599 result: exec_result,
600 stopwatch_end: stopwatch.snapshot(),
601 env_map,
602 })
603 }
604
605 #[instrument(level = "debug", skip(self, resp_tx, req_rx))]
607 async fn run_test(
608 &self,
609 test: TestPacket<'a>,
610 resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
611 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
612 ) -> InternalExecuteStatus<'a> {
613 let mut stopwatch = crate::time::stopwatch();
614
615 match self
616 .run_test_inner(test.clone(), &mut stopwatch, resp_tx, req_rx)
617 .await
618 {
619 Ok(run_status) => run_status,
620 Err(error) => InternalExecuteStatus {
621 test,
622 slow_after: None,
623 output: ChildExecutionOutput::StartError(error),
624 result: ExecutionResult::ExecFail,
625 stopwatch_end: stopwatch.snapshot(),
626 },
627 }
628 }
629
630 async fn run_test_inner(
631 &self,
632 test: TestPacket<'a>,
633 stopwatch: &mut StopwatchStart,
634 resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
635 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
636 ) -> Result<InternalExecuteStatus<'a>, ChildStartError> {
637 let ctx = TestExecuteContext {
638 profile_name: self.profile.name(),
639 double_spawn: &self.double_spawn,
640 target_runner: &self.target_runner,
641 };
642 let mut cmd = test.test_instance.make_command(
643 &ctx,
644 self.test_list,
645 test.settings.run_wrapper(),
646 test.settings.run_extra_args(),
647 );
648 let command_mut = cmd.command_mut();
649
650 command_mut.env("__NEXTEST_ATTEMPT", format!("{}", test.retry_data.attempt));
652
653 command_mut.env("NEXTEST_RUN_ID", format!("{}", self.run_id));
654
655 command_mut.env(
657 "NEXTEST_TEST_GLOBAL_SLOT",
658 test.cx.global_slot().to_string(),
659 );
660 match test.settings.test_group() {
661 TestGroup::Custom(name) => {
662 debug_assert!(
663 test.cx.group_slot().is_some(),
664 "test_group being set implies group_slot is set"
665 );
666 command_mut.env("NEXTEST_TEST_GROUP", name.as_str());
667 }
668 TestGroup::Global => {
669 debug_assert!(
670 test.cx.group_slot().is_none(),
671 "test_group being unset implies group_slot is unset"
672 );
673 command_mut.env("NEXTEST_TEST_GROUP", TestGroup::GLOBAL_STR);
674 }
675 }
676 if let Some(group_slot) = test.cx.group_slot() {
677 command_mut.env("NEXTEST_TEST_GROUP_SLOT", group_slot.to_string());
678 } else {
679 command_mut.env("NEXTEST_TEST_GROUP_SLOT", "none");
680 }
681
682 command_mut.stdin(Stdio::null());
683 test.setup_script_data.apply(
684 &test.test_instance.to_test_query(),
685 &self.profile.filterset_ecx(),
686 command_mut,
687 );
688 super::os::set_process_group(command_mut);
689
690 let job = super::os::Job::create().ok();
693
694 let crate::test_command::Child {
695 mut child,
696 child_fds,
697 } = cmd
698 .spawn(self.capture_strategy)
699 .map_err(|error| ChildStartError::Spawn(Arc::new(error)))?;
700
701 let child_pid = child
706 .id()
707 .expect("child has never been polled so must return a PID");
708
709 let _ = super::os::assign_process_to_job(&child, job.as_ref());
712
713 let mut child_acc = ChildAccumulator::new(child_fds);
714
715 let mut status: Option<ExecutionResult> = None;
716 let slow_timeout = test.settings.slow_timeout();
717 let leak_timeout = test.settings.leak_timeout();
718
719 let mut interval_sleep = std::pin::pin!(crate::time::pausable_sleep(slow_timeout.period));
722
723 let mut timeout_hit = 0;
724
725 let mut cx = UnitContext {
726 packet: UnitPacket::Test(test.clone()),
727 slow_after: None,
728 };
729
730 let (res, leaked) = {
731 let res = loop {
732 tokio::select! {
733 () = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
734 res = child.wait() => {
735 break res;
737 }
738 _ = &mut interval_sleep, if status.is_none() => {
739 cx.slow_after = Some(slow_timeout.period);
741
742 timeout_hit += 1;
743 let will_terminate = if let Some(terminate_after) = slow_timeout.terminate_after {
744 NonZeroUsize::new(timeout_hit as usize)
745 .expect("timeout_hit was just incremented")
746 >= terminate_after
747 } else {
748 false
749 };
750
751 if !slow_timeout.grace_period.is_zero() {
752 let _ = resp_tx.send(test.slow_event(
753 timeout_hit * slow_timeout.period,
756 will_terminate.then_some(slow_timeout.grace_period),
757 ));
758 }
759
760 if will_terminate {
761 _ = super::os::terminate_child(
769 &cx,
770 &mut child,
771 &mut child_acc,
772 InternalTerminateReason::Timeout,
773 stopwatch,
774 req_rx,
775 job.as_ref(),
776 slow_timeout.grace_period,
777 ).await;
778 status = Some(ExecutionResult::Timeout);
779 if slow_timeout.grace_period.is_zero() {
780 break child.wait().await;
781 }
782 } else {
784 interval_sleep.as_mut().reset_last_duration();
785 }
786 }
787 recv = req_rx.recv() => {
788 let req = recv.expect("req_rx sender is open");
791
792 match req {
793 RunUnitRequest::Signal(req) => {
794 #[cfg_attr(not(windows), expect(unused_variables))]
795 let res = handle_signal_request(
796 &cx,
797 &mut child,
798 &mut child_acc,
799 req,
800 stopwatch,
801 interval_sleep.as_mut(),
802 req_rx,
803 job.as_ref(),
804 slow_timeout.grace_period,
805 ).await;
806
807 #[cfg(windows)]
820 {
821 if matches!(
822 res,
823 HandleSignalResult::Terminated(super::TerminateChildResult::Killed)
824 ) {
825 status = Some(ExecutionResult::Fail {
826 failure_status: FailureStatus::Abort(
827 crate::reporter::events::AbortStatus::JobObject,
828 ),
829 leaked: false,
830 });
831 }
832 }
833 }
834 RunUnitRequest::OtherCancel => {
835 }
838 RunUnitRequest::Query(RunUnitQuery::GetInfo(tx)) => {
839 _ = tx.send(test.info_response(
840 UnitState::Running {
841 pid: child_pid,
842 time_taken: stopwatch.snapshot().active,
843 slow_after: cx.slow_after,
844 },
845 child_acc.snapshot_in_progress(UnitKind::WAITING_ON_TEST_MESSAGE),
846 ));
847 }
848 }
849 }
850 };
851 };
852
853 let tentative_status = status.or_else(|| {
855 res.as_ref().ok().map(|res| {
856 create_execution_result(*res, &child_acc.errors, false, LeakTimeoutResult::Pass)
857 })
858 });
859
860 let leaked = detect_fd_leaks(
861 &cx,
862 child_pid,
863 &mut child_acc,
864 tentative_status,
865 leak_timeout,
866 stopwatch,
867 req_rx,
868 )
869 .await;
870
871 (res, leaked)
872 };
873
874 let exit_status = match res {
875 Ok(exit_status) => Some(exit_status),
876 Err(err) => {
877 child_acc.errors.push(ChildFdError::Wait(Arc::new(err)));
878 None
879 }
880 };
881
882 let exit_status = exit_status.expect("None always results in early return");
883 let exec_result = status.unwrap_or_else(|| {
884 create_execution_result(exit_status, &child_acc.errors, leaked, leak_timeout.result)
885 });
886
887 Ok(InternalExecuteStatus {
888 test,
889 slow_after: cx.slow_after,
890 output: ChildExecutionOutput::Output {
891 result: Some(exec_result),
892 output: child_acc.output.freeze(),
893 errors: ErrorList::new(UnitKind::WAITING_ON_TEST_MESSAGE, child_acc.errors),
894 },
895 result: exec_result,
896 stopwatch_end: stopwatch.snapshot(),
897 })
898 }
899}
900
901#[derive(Debug)]
902struct BackoffIter {
903 policy: RetryPolicy,
904 current_factor: f64,
905 remaining_attempts: usize,
906}
907
908impl BackoffIter {
909 const BACKOFF_EXPONENT: f64 = 2.;
910
911 fn new(policy: RetryPolicy) -> Self {
912 let remaining_attempts = policy.count();
913 Self {
914 policy,
915 current_factor: 1.,
916 remaining_attempts,
917 }
918 }
919
920 fn next_delay_and_jitter(&mut self) -> (Duration, bool) {
921 match self.policy {
922 RetryPolicy::Fixed { delay, jitter, .. } => (delay, jitter),
923 RetryPolicy::Exponential {
924 delay,
925 jitter,
926 max_delay,
927 ..
928 } => {
929 let factor = self.current_factor;
930 let exp_delay = delay.mul_f64(factor);
931
932 if let Some(max_delay) = max_delay {
934 if exp_delay > max_delay {
935 return (max_delay, jitter);
936 }
937 }
938
939 let next_factor = self.current_factor * Self::BACKOFF_EXPONENT;
940 self.current_factor = next_factor;
941
942 (exp_delay, jitter)
943 }
944 }
945 }
946
947 fn apply_jitter(duration: Duration) -> Duration {
948 let jitter: f64 = rand::rng().sample(OpenClosed01);
949 duration.mul_f64(0.5 + jitter / 2.)
951 }
952}
953
954impl Iterator for BackoffIter {
955 type Item = Duration;
956 fn next(&mut self) -> Option<Self::Item> {
957 if self.remaining_attempts > 0 {
958 let (mut delay, jitter) = self.next_delay_and_jitter();
959 if jitter {
960 delay = Self::apply_jitter(delay);
961 }
962 self.remaining_attempts -= 1;
963 Some(delay)
964 } else {
965 None
966 }
967 }
968}
969
970pub(super) struct UnitContext<'a> {
973 packet: UnitPacket<'a>,
974 slow_after: Option<Duration>,
978}
979
980impl<'a> UnitContext<'a> {
981 pub(super) fn packet(&self) -> &UnitPacket<'a> {
982 &self.packet
983 }
984
985 pub(super) fn info_response(
986 &self,
987 state: UnitState,
988 output: ChildExecutionOutput,
989 ) -> InfoResponse<'a> {
990 match &self.packet {
991 UnitPacket::SetupScript(packet) => packet.info_response(state, output),
992 UnitPacket::Test(packet) => packet.info_response(state, output),
993 }
994 }
995}
996
997#[derive(Clone, Debug)]
998pub(super) enum UnitPacket<'a> {
999 SetupScript(SetupScriptPacket<'a>),
1000 Test(TestPacket<'a>),
1001}
1002
1003impl UnitPacket<'_> {
1004 pub(super) fn kind(&self) -> UnitKind {
1005 match self {
1006 Self::SetupScript(_) => UnitKind::Script,
1007 Self::Test(_) => UnitKind::Test,
1008 }
1009 }
1010}
1011
1012#[derive(Clone)]
1013pub(super) struct TestPacket<'a> {
1014 stress_index: Option<StressIndex>,
1015 test_instance: TestInstance<'a>,
1016 cx: FutureQueueContext,
1017 retry_data: RetryData,
1018 settings: Arc<TestSettings<'a>>,
1019 setup_script_data: Arc<SetupScriptExecuteData<'a>>,
1020 delay_before_start: Duration,
1021}
1022
1023impl<'a> TestPacket<'a> {
1024 fn slow_event(&self, elapsed: Duration, will_terminate: Option<Duration>) -> ExecutorEvent<'a> {
1025 ExecutorEvent::Slow {
1026 stress_index: self.stress_index,
1027 test_instance: self.test_instance,
1028 retry_data: self.retry_data,
1029 elapsed,
1030 will_terminate,
1031 }
1032 }
1033
1034 pub(super) fn retry_data(&self) -> RetryData {
1035 self.retry_data
1036 }
1037
1038 pub(super) fn delay_before_start(&self) -> Duration {
1039 self.delay_before_start
1040 }
1041
1042 pub(super) fn info_response(
1043 &self,
1044 state: UnitState,
1045 output: ChildExecutionOutput,
1046 ) -> InfoResponse<'a> {
1047 InfoResponse::Test(TestInfoResponse {
1048 stress_index: self.stress_index,
1049 test_instance: self.test_instance.id(),
1050 state,
1051 retry_data: self.retry_data,
1052 output,
1053 })
1054 }
1055}
1056
1057impl fmt::Debug for TestPacket<'_> {
1058 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1059 f.debug_struct("TestPacket")
1060 .field("test_instance", &self.test_instance.id())
1061 .field("cx", &self.cx)
1062 .finish_non_exhaustive()
1063 }
1064}
1065
1066#[derive(Clone, Debug)]
1067pub(super) struct SetupScriptPacket<'a> {
1068 stress_index: Option<StressIndex>,
1069 script_id: ScriptId,
1070 config: &'a SetupScriptConfig,
1071 program: String,
1072}
1073
1074impl<'a> SetupScriptPacket<'a> {
1075 fn make_command(
1077 &self,
1078 profile_name: &str,
1079 double_spawn: &DoubleSpawnInfo,
1080 test_list: &TestList<'_>,
1081 ) -> Result<SetupScriptCommand, ChildStartError> {
1082 SetupScriptCommand::new(self.config, profile_name, double_spawn, test_list)
1083 }
1084
1085 fn slow_event(&self, elapsed: Duration, will_terminate: Option<Duration>) -> ExecutorEvent<'a> {
1086 ExecutorEvent::SetupScriptSlow {
1087 stress_index: self.stress_index,
1088 script_id: self.script_id.clone(),
1089 config: self.config,
1090 program: self.program.clone(),
1091 elapsed,
1092 will_terminate,
1093 }
1094 }
1095
1096 pub(super) fn info_response(
1097 &self,
1098 state: UnitState,
1099 output: ChildExecutionOutput,
1100 ) -> InfoResponse<'a> {
1101 InfoResponse::SetupScript(SetupScriptInfoResponse {
1102 stress_index: self.stress_index,
1103 script_id: self.script_id.clone(),
1104 program: self.program.clone(),
1105 args: &self.config.command.args,
1106 state,
1107 output,
1108 })
1109 }
1110}
1111
1112fn drain_req_rx<'a>(
1114 mut receiver: UnboundedReceiver<RunUnitRequest<'a>>,
1115 status: UnitExecuteStatus<'a, '_>,
1116) {
1117 receiver.close();
1119 loop {
1120 let message = receiver.try_recv();
1122 match message {
1123 Ok(message) => {
1124 message.drain(status);
1125 }
1126 Err(_) => {
1127 break;
1128 }
1129 }
1130 }
1131}
1132
1133async fn handle_delay_between_attempts<'a>(
1134 packet: &TestPacket<'a>,
1135 previous_result: ExecutionResult,
1136 previous_slow: bool,
1137 delay: Duration,
1138 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
1139) {
1140 let mut sleep = std::pin::pin!(crate::time::pausable_sleep(delay));
1141 #[cfg_attr(not(unix), expect(unused_mut))]
1142 let mut waiting_stopwatch = crate::time::stopwatch();
1143
1144 loop {
1145 tokio::select! {
1146 _ = &mut sleep => {
1147 break;
1149 }
1150 recv = req_rx.recv() => {
1151 let req = recv.expect("req_rx sender is open");
1152
1153 match req {
1154 #[cfg(unix)]
1155 RunUnitRequest::Signal(SignalRequest::Stop(tx)) => {
1156 sleep.as_mut().pause();
1157 waiting_stopwatch.pause();
1158 _ = tx.send(());
1159 }
1160 #[cfg(unix)]
1161 RunUnitRequest::Signal(SignalRequest::Continue) => {
1162 if sleep.is_paused() {
1163 sleep.as_mut().resume();
1164 waiting_stopwatch.resume();
1165 }
1166 }
1167 RunUnitRequest::Signal(SignalRequest::Shutdown(_)) => {
1168 break;
1171 }
1172 RunUnitRequest::OtherCancel => {
1173 break;
1176 }
1177 RunUnitRequest::Query(RunUnitQuery::GetInfo(tx)) => {
1178 let waiting_snapshot = waiting_stopwatch.snapshot();
1179 _ = tx.send(
1180 packet.info_response(
1181 UnitState::DelayBeforeNextAttempt {
1182 previous_result,
1183 previous_slow,
1184 waiting_duration: waiting_snapshot.active,
1185 remaining: delay
1186 .checked_sub(waiting_snapshot.active)
1187 .unwrap_or_default(),
1188 },
1189 ChildExecutionOutput::Output {
1192 result: None,
1193 output: ChildOutput::Split(ChildSplitOutput {
1194 stdout: None,
1195 stderr: None,
1196 }),
1197 errors: None,
1198 },
1199 ),
1200 );
1201 }
1202 }
1203 }
1204 }
1205 }
1206}
1207
1208async fn detect_fd_leaks<'a>(
1216 cx: &UnitContext<'a>,
1217 child_pid: u32,
1218 child_acc: &mut ChildAccumulator,
1219 tentative_result: Option<ExecutionResult>,
1220 leak_timeout: LeakTimeout,
1221 stopwatch: &mut StopwatchStart,
1222 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
1223) -> bool {
1224 loop {
1225 let mut sleep = std::pin::pin!(tokio::time::sleep(leak_timeout.period));
1228 let waiting_stopwatch = crate::time::stopwatch();
1229
1230 tokio::select! {
1231 () = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
1235 () = &mut sleep, if !child_acc.fds.is_done() => {
1236 break true;
1237 }
1238 recv = req_rx.recv(), if !child_acc.fds.is_done() => {
1239 let req = recv.expect("a RecvError should never happen here");
1243
1244 match req {
1245 RunUnitRequest::Signal(_) => {
1246 }
1248 RunUnitRequest::OtherCancel => {
1249 }
1252 RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => {
1253 let snapshot = waiting_stopwatch.snapshot();
1254 let resp = cx.info_response(
1255 UnitState::Exiting {
1256 pid: child_pid,
1261 time_taken: stopwatch.snapshot().active,
1262 slow_after: cx.slow_after,
1263 tentative_result,
1264 waiting_duration: snapshot.active,
1265 remaining: leak_timeout.period
1266 .checked_sub(snapshot.active)
1267 .unwrap_or_default(),
1268 },
1269 child_acc.snapshot_in_progress(cx.packet.kind().waiting_on_message()),
1270 );
1271
1272 _ = sender.send(resp);
1273 }
1274 }
1275 }
1276 else => {
1277 break false;
1278 }
1279 }
1280 }
1281}
1282
1283#[expect(clippy::too_many_arguments)]
1287async fn handle_signal_request<'a>(
1288 cx: &UnitContext<'a>,
1289 child: &mut Child,
1290 child_acc: &mut ChildAccumulator,
1291 req: SignalRequest,
1292 stopwatch: &mut StopwatchStart,
1293 #[cfg_attr(not(unix), expect(unused_mut, unused_variables))] mut interval_sleep: Pin<
1294 &mut PausableSleep,
1295 >,
1296 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
1297 job: Option<&super::os::Job>,
1298 grace_period: Duration,
1299) -> HandleSignalResult {
1300 match req {
1301 #[cfg(unix)]
1302 SignalRequest::Stop(sender) => {
1303 stopwatch.pause();
1306 interval_sleep.as_mut().pause();
1307 super::os::job_control_child(child, crate::signal::JobControlEvent::Stop);
1308 let _ = sender.send(());
1311 HandleSignalResult::JobControl
1312 }
1313 #[cfg(unix)]
1314 SignalRequest::Continue => {
1315 if stopwatch.is_paused() {
1318 stopwatch.resume();
1319 interval_sleep.as_mut().resume();
1320 super::os::job_control_child(child, crate::signal::JobControlEvent::Continue);
1321 }
1322 HandleSignalResult::JobControl
1323 }
1324 SignalRequest::Shutdown(event) => {
1325 let res = super::os::terminate_child(
1326 cx,
1327 child,
1328 child_acc,
1329 InternalTerminateReason::Signal(event),
1330 stopwatch,
1331 req_rx,
1332 job,
1333 grace_period,
1334 )
1335 .await;
1336 HandleSignalResult::Terminated(res)
1337 }
1338 }
1339}
1340
1341fn create_execution_result(
1342 exit_status: ExitStatus,
1343 child_errors: &[ChildFdError],
1344 leaked: bool,
1345 leak_timeout_result: LeakTimeoutResult,
1346) -> ExecutionResult {
1347 if !child_errors.is_empty() {
1348 ExecutionResult::ExecFail
1351 } else if exit_status.success() {
1352 if leaked {
1353 ExecutionResult::Leak {
1356 result: leak_timeout_result,
1357 }
1358 } else {
1359 ExecutionResult::Pass
1360 }
1361 } else {
1362 ExecutionResult::Fail {
1363 failure_status: FailureStatus::extract(exit_status),
1364 leaked,
1365 }
1366 }
1367}