1use super::HandleSignalResult;
15use crate::{
16 config::{
17 EvaluatableProfile, LeakTimeout, LeakTimeoutResult, RetryPolicy, ScriptId,
18 SetupScriptCommand, SetupScriptConfig, SetupScriptExecuteData, SlowTimeout, TestGroup,
19 TestSettings,
20 },
21 double_spawn::DoubleSpawnInfo,
22 errors::{ChildError, ChildFdError, ChildStartError, ErrorList},
23 list::{TestExecuteContext, TestInstance, TestInstanceWithSettings, TestList},
24 reporter::events::{
25 ExecutionResult, FailureStatus, InfoResponse, RetryData, SetupScriptInfoResponse,
26 TestInfoResponse, UnitKind, UnitState,
27 },
28 runner::{
29 ExecutorEvent, InternalExecuteStatus, InternalSetupScriptExecuteStatus,
30 InternalTerminateReason, RunUnitQuery, RunUnitRequest, SignalRequest, UnitExecuteStatus,
31 parse_env_file,
32 },
33 target_runner::TargetRunner,
34 test_command::{ChildAccumulator, ChildFds},
35 test_output::{CaptureStrategy, ChildExecutionOutput, ChildOutput, ChildSplitOutput},
36 time::{PausableSleep, StopwatchStart},
37};
38use future_queue::FutureQueueContext;
39use nextest_metadata::FilterMatch;
40use quick_junit::ReportUuid;
41use rand::{Rng, distr::OpenClosed01};
42use std::{
43 fmt,
44 num::NonZeroUsize,
45 pin::Pin,
46 process::{ExitStatus, Stdio},
47 sync::Arc,
48 time::Duration,
49};
50use tokio::{
51 process::Child,
52 sync::{
53 mpsc::{UnboundedReceiver, UnboundedSender},
54 oneshot,
55 },
56};
57use tracing::{debug, instrument};
58
59#[derive(Debug)]
60pub(super) struct ExecutorContext<'a> {
61 run_id: ReportUuid,
62 profile: &'a EvaluatableProfile<'a>,
63 test_list: &'a TestList<'a>,
64 double_spawn: DoubleSpawnInfo,
65 target_runner: TargetRunner,
66 capture_strategy: CaptureStrategy,
67 force_retries: Option<RetryPolicy>,
69}
70
71impl<'a> ExecutorContext<'a> {
72 pub(super) fn new(
73 run_id: ReportUuid,
74 profile: &'a EvaluatableProfile<'a>,
75 test_list: &'a TestList<'a>,
76 double_spawn: DoubleSpawnInfo,
77 target_runner: TargetRunner,
78 capture_strategy: CaptureStrategy,
79 force_retries: Option<RetryPolicy>,
80 ) -> Self {
81 Self {
82 run_id,
83 profile,
84 test_list,
85 double_spawn,
86 target_runner,
87 capture_strategy,
88 force_retries,
89 }
90 }
91
92 pub(super) async fn run_setup_scripts(
94 &self,
95 resp_tx: UnboundedSender<ExecutorEvent<'a>>,
96 ) -> SetupScriptExecuteData<'a> {
97 let setup_scripts = self.profile.setup_scripts(self.test_list);
98 let total = setup_scripts.len();
99 debug!("running {} setup scripts", total);
100
101 let mut setup_script_data = SetupScriptExecuteData::new();
102
103 for (index, script) in setup_scripts.into_iter().enumerate() {
105 let this_resp_tx = resp_tx.clone();
106
107 let script_id = script.id.clone();
108 let config = script.config;
109 let program = config.command.program(
110 self.test_list.workspace_root(),
111 &self.test_list.rust_build_meta().target_directory,
112 );
113
114 let script_fut = async move {
115 let (req_rx_tx, req_rx_rx) = oneshot::channel();
116 let _ = this_resp_tx.send(ExecutorEvent::SetupScriptStarted {
117 script_id: script_id.clone(),
118 config,
119 program: program.clone(),
120 index,
121 total,
122 req_rx_tx,
123 });
124 let mut req_rx = match req_rx_rx.await {
125 Ok(req_rx) => req_rx,
126 Err(_) => {
127 return None;
130 }
131 };
132
133 let packet = SetupScriptPacket {
134 script_id: script_id.clone(),
135 config,
136 program: program.clone(),
137 };
138
139 let status = self
140 .run_setup_script(packet, &this_resp_tx, &mut req_rx)
141 .await;
142
143 drain_req_rx(req_rx, UnitExecuteStatus::SetupScript(&status));
146
147 let status = status.into_external();
148 let env_map = status.env_map.clone();
149
150 let _ = this_resp_tx.send(ExecutorEvent::SetupScriptFinished {
151 script_id,
152 config,
153 program,
154 index,
155 total,
156 status,
157 });
158
159 env_map.map(|env_map| (script, env_map))
160 };
161
162 if let Some((script, env_map)) = script_fut.await {
164 setup_script_data.add_script(script, env_map);
165 }
166 }
167
168 setup_script_data
169 }
170
171 pub(super) async fn run_test_instance(
173 &self,
174 test: TestInstanceWithSettings<'a>,
175 cx: FutureQueueContext,
176 resp_tx: UnboundedSender<ExecutorEvent<'a>>,
177 setup_script_data: Arc<SetupScriptExecuteData<'a>>,
178 ) {
179 debug!(test_name = test.instance.name, "running test");
180
181 let settings = Arc::new(test.settings);
182
183 let retry_policy = self.force_retries.unwrap_or_else(|| settings.retries());
184 let total_attempts = retry_policy.count() + 1;
185 let mut backoff_iter = BackoffIter::new(retry_policy);
186
187 if let FilterMatch::Mismatch { reason } = test.instance.test_info.filter_match {
188 debug_assert!(
189 false,
190 "this test should already have been skipped in a filter step"
191 );
192 let _ = resp_tx.send(ExecutorEvent::Skipped {
194 test_instance: test.instance,
195 reason,
196 });
197 return;
198 }
199
200 let (req_rx_tx, req_rx_rx) = oneshot::channel();
201
202 _ = resp_tx.send(ExecutorEvent::Started {
205 test_instance: test.instance,
206 req_rx_tx,
207 });
208 let mut req_rx = match req_rx_rx.await {
209 Ok(rx) => rx,
210 Err(_) => {
211 return;
214 }
215 };
216
217 let mut attempt = 0;
218 let mut delay = Duration::ZERO;
219 let last_run_status = loop {
220 attempt += 1;
221 let retry_data = RetryData {
222 attempt,
223 total_attempts,
224 };
225
226 if retry_data.attempt > 1 {
227 let (tx, rx) = oneshot::channel();
231 _ = resp_tx.send(ExecutorEvent::RetryStarted {
232 test_instance: test.instance,
233 retry_data,
234 tx,
235 });
236
237 match rx.await {
238 Ok(()) => {}
239 Err(_) => {
240 return;
243 }
244 }
245 }
246
247 let packet = TestPacket {
251 test_instance: test.instance,
252 cx: cx.clone(),
253 retry_data,
254 settings: settings.clone(),
255 setup_script_data: setup_script_data.clone(),
256 delay_before_start: delay,
257 };
258
259 let run_status = self.run_test(packet.clone(), &resp_tx, &mut req_rx).await;
260
261 if run_status.result.is_success() {
262 break run_status;
264 } else if retry_data.attempt < retry_data.total_attempts {
265 delay = backoff_iter
267 .next()
268 .expect("backoff delay must be non-empty");
269
270 let run_status = run_status.into_external();
271 let previous_result = run_status.result;
272 let previous_slow = run_status.is_slow;
273
274 let _ = resp_tx.send(ExecutorEvent::AttemptFailedWillRetry {
275 test_instance: test.instance,
276 failure_output: settings.failure_output(),
277 run_status,
278 delay_before_next_attempt: delay,
279 });
280
281 handle_delay_between_attempts(
282 &packet,
283 previous_result,
284 previous_slow,
285 delay,
286 &mut req_rx,
287 )
288 .await;
289 } else {
290 break run_status;
292 }
293 };
294
295 drain_req_rx(req_rx, UnitExecuteStatus::Test(&last_run_status));
296
297 let last_run_status = last_run_status.into_external();
302 let _ = resp_tx.send(ExecutorEvent::Finished {
303 test_instance: test.instance,
304 success_output: settings.success_output(),
305 failure_output: settings.failure_output(),
306 junit_store_success_output: settings.junit_store_success_output(),
307 junit_store_failure_output: settings.junit_store_failure_output(),
308 last_run_status,
309 });
310 }
311
312 #[instrument(level = "debug", skip(self, resp_tx, req_rx))]
318 async fn run_setup_script(
319 &self,
320 script: SetupScriptPacket<'a>,
321 resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
322 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
323 ) -> InternalSetupScriptExecuteStatus<'a> {
324 let mut stopwatch = crate::time::stopwatch();
325
326 match self
327 .run_setup_script_inner(script.clone(), &mut stopwatch, resp_tx, req_rx)
328 .await
329 {
330 Ok(status) => status,
331 Err(error) => InternalSetupScriptExecuteStatus {
332 script,
333 slow_after: None,
334 output: ChildExecutionOutput::StartError(error),
335 result: ExecutionResult::ExecFail,
336 stopwatch_end: stopwatch.snapshot(),
337 env_map: None,
338 },
339 }
340 }
341
342 #[instrument(level = "debug", skip(self, resp_tx, req_rx))]
343 async fn run_setup_script_inner(
344 &self,
345 script: SetupScriptPacket<'a>,
346 stopwatch: &mut StopwatchStart,
347 resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
348 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
349 ) -> Result<InternalSetupScriptExecuteStatus<'a>, ChildStartError> {
350 let mut cmd =
351 script.make_command(self.profile.name(), &self.double_spawn, self.test_list)?;
352 let command_mut = cmd.command_mut();
353
354 command_mut.env("NEXTEST_RUN_ID", format!("{}", self.run_id));
355 command_mut.stdin(Stdio::null());
356 super::os::set_process_group(command_mut);
357
358 let job = super::os::Job::create().ok();
361
362 if self.capture_strategy != CaptureStrategy::None {
364 if script.config.capture_stdout {
365 command_mut.stdout(std::process::Stdio::piped());
366 }
367 if script.config.capture_stderr {
368 command_mut.stderr(std::process::Stdio::piped());
369 }
370 }
371
372 let (mut child, env_path) = cmd
373 .spawn()
374 .map_err(|error| ChildStartError::Spawn(Arc::new(error)))?;
375 let child_pid = child
376 .id()
377 .expect("child has never been polled so must return a PID");
378
379 let _ = super::os::assign_process_to_job(&child, job.as_ref());
382
383 let mut status: Option<ExecutionResult> = None;
384 let slow_timeout = script
389 .config
390 .slow_timeout
391 .unwrap_or(SlowTimeout::VERY_LARGE);
392 let leak_timeout = script.config.leak_timeout.unwrap_or_default();
393
394 let mut interval_sleep = std::pin::pin!(crate::time::pausable_sleep(slow_timeout.period));
395
396 let mut timeout_hit = 0;
397
398 let child_fds = ChildFds::new_split(child.stdout.take(), child.stderr.take());
399 let mut child_acc = ChildAccumulator::new(child_fds);
400
401 let mut cx = UnitContext {
402 packet: UnitPacket::SetupScript(script.clone()),
403 slow_after: None,
404 };
405
406 let (res, leaked) = {
407 let res = loop {
408 tokio::select! {
409 () = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
410 res = child.wait() => {
411 break res;
413 }
414 _ = &mut interval_sleep, if status.is_none() => {
415 cx.slow_after = Some(slow_timeout.period);
417
418 timeout_hit += 1;
419 let will_terminate = if let Some(terminate_after) = slow_timeout.terminate_after {
420 NonZeroUsize::new(timeout_hit as usize)
421 .expect("timeout_hit was just incremented")
422 >= terminate_after
423 } else {
424 false
425 };
426
427 if !slow_timeout.grace_period.is_zero() {
428 let _ = resp_tx.send(script.slow_event(
429 timeout_hit * slow_timeout.period,
432 will_terminate.then_some(slow_timeout.grace_period),
433 ));
434 }
435
436 if will_terminate {
437 _ = super::os::terminate_child(
445 &cx,
446 &mut child,
447 &mut child_acc,
448 InternalTerminateReason::Timeout,
449 stopwatch,
450 req_rx,
451 job.as_ref(),
452 slow_timeout.grace_period,
453 ).await;
454 status = Some(ExecutionResult::Timeout);
455 if slow_timeout.grace_period.is_zero() {
456 break child.wait().await;
457 }
458 } else {
460 interval_sleep.as_mut().reset_last_duration();
461 }
462 }
463 recv = req_rx.recv() => {
464 let req = recv.expect("a RecvError should never happen here");
468
469 match req {
470 RunUnitRequest::Signal(req) => {
471 #[cfg_attr(not(windows), expect(unused_variables))]
472 let res = handle_signal_request(
473 &cx,
474 &mut child,
475 &mut child_acc,
476 req,
477 stopwatch,
478 interval_sleep.as_mut(),
479 req_rx,
480 job.as_ref(),
481 slow_timeout.grace_period
482 ).await;
483
484 #[cfg(windows)]
497 {
498 if matches!(
499 res,
500 HandleSignalResult::Terminated(super::TerminateChildResult::Killed)
501 ) {
502 status = Some(ExecutionResult::Fail {
503 failure_status: FailureStatus::Abort(
504 crate::reporter::events::AbortStatus::JobObject,
505 ),
506 leaked: false,
507 });
508 }
509 }
510 }
511 RunUnitRequest::OtherCancel => {
512 }
515 RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => {
516 _ = sender.send(script.info_response(
517 UnitState::Running {
518 pid: child_pid,
519 time_taken: stopwatch.snapshot().active,
520 slow_after: cx.slow_after,
521 },
522 child_acc.snapshot_in_progress(UnitKind::WAITING_ON_SCRIPT_MESSAGE),
523 ));
524 }
525 }
526 }
527 }
528 };
529
530 let tentative_status = status.or_else(|| {
532 res.as_ref().ok().map(|res| {
533 create_execution_result(*res, &child_acc.errors, false, LeakTimeoutResult::Pass)
534 })
535 });
536
537 let leaked = detect_fd_leaks(
538 &cx,
539 child_pid,
540 &mut child_acc,
541 tentative_status,
542 leak_timeout,
543 stopwatch,
544 req_rx,
545 )
546 .await;
547
548 (res, leaked)
549 };
550
551 let exit_status = match res {
552 Ok(exit_status) => Some(exit_status),
553 Err(err) => {
554 child_acc.errors.push(ChildFdError::Wait(Arc::new(err)));
555 None
556 }
557 };
558
559 let exit_status = exit_status.expect("None always results in early return");
560
561 let exec_result = status.unwrap_or_else(|| {
562 create_execution_result(exit_status, &child_acc.errors, leaked, leak_timeout.result)
563 });
564
565 let mut errors: Vec<_> = child_acc.errors.into_iter().map(ChildError::from).collect();
567 let env_map = if exec_result.is_success() {
568 match parse_env_file(&env_path).await {
569 Ok(env_map) => Some(env_map),
570 Err(error) => {
571 errors.push(ChildError::SetupScriptOutput(error));
572 None
573 }
574 }
575 } else {
576 None
577 };
578
579 Ok(InternalSetupScriptExecuteStatus {
580 script,
581 slow_after: cx.slow_after,
582 output: ChildExecutionOutput::Output {
583 result: Some(exec_result),
584 output: child_acc.output.freeze(),
585 errors: ErrorList::new(UnitKind::WAITING_ON_SCRIPT_MESSAGE, errors),
586 },
587 result: exec_result,
588 stopwatch_end: stopwatch.snapshot(),
589 env_map,
590 })
591 }
592
593 #[instrument(level = "debug", skip(self, resp_tx, req_rx))]
595 async fn run_test(
596 &self,
597 test: TestPacket<'a>,
598 resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
599 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
600 ) -> InternalExecuteStatus<'a> {
601 let mut stopwatch = crate::time::stopwatch();
602
603 match self
604 .run_test_inner(test.clone(), &mut stopwatch, resp_tx, req_rx)
605 .await
606 {
607 Ok(run_status) => run_status,
608 Err(error) => InternalExecuteStatus {
609 test,
610 slow_after: None,
611 output: ChildExecutionOutput::StartError(error),
612 result: ExecutionResult::ExecFail,
613 stopwatch_end: stopwatch.snapshot(),
614 },
615 }
616 }
617
618 async fn run_test_inner(
619 &self,
620 test: TestPacket<'a>,
621 stopwatch: &mut StopwatchStart,
622 resp_tx: &UnboundedSender<ExecutorEvent<'a>>,
623 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
624 ) -> Result<InternalExecuteStatus<'a>, ChildStartError> {
625 let ctx = TestExecuteContext {
626 profile_name: self.profile.name(),
627 double_spawn: &self.double_spawn,
628 target_runner: &self.target_runner,
629 };
630 let mut cmd = test.test_instance.make_command(
631 &ctx,
632 self.test_list,
633 test.settings.run_wrapper(),
634 test.settings.run_extra_args(),
635 );
636 let command_mut = cmd.command_mut();
637
638 command_mut.env("__NEXTEST_ATTEMPT", format!("{}", test.retry_data.attempt));
640
641 command_mut.env("NEXTEST_RUN_ID", format!("{}", self.run_id));
642
643 command_mut.env(
645 "NEXTEST_TEST_GLOBAL_SLOT",
646 test.cx.global_slot().to_string(),
647 );
648 match test.settings.test_group() {
649 TestGroup::Custom(name) => {
650 debug_assert!(
651 test.cx.group_slot().is_some(),
652 "test_group being set implies group_slot is set"
653 );
654 command_mut.env("NEXTEST_TEST_GROUP", name.as_str());
655 }
656 TestGroup::Global => {
657 debug_assert!(
658 test.cx.group_slot().is_none(),
659 "test_group being unset implies group_slot is unset"
660 );
661 command_mut.env("NEXTEST_TEST_GROUP", TestGroup::GLOBAL_STR);
662 }
663 }
664 if let Some(group_slot) = test.cx.group_slot() {
665 command_mut.env("NEXTEST_TEST_GROUP_SLOT", group_slot.to_string());
666 } else {
667 command_mut.env("NEXTEST_TEST_GROUP_SLOT", "none");
668 }
669
670 command_mut.stdin(Stdio::null());
671 test.setup_script_data.apply(
672 &test.test_instance.to_test_query(),
673 &self.profile.filterset_ecx(),
674 command_mut,
675 );
676 super::os::set_process_group(command_mut);
677
678 let job = super::os::Job::create().ok();
681
682 let crate::test_command::Child {
683 mut child,
684 child_fds,
685 } = cmd
686 .spawn(self.capture_strategy)
687 .map_err(|error| ChildStartError::Spawn(Arc::new(error)))?;
688
689 let child_pid = child
694 .id()
695 .expect("child has never been polled so must return a PID");
696
697 let _ = super::os::assign_process_to_job(&child, job.as_ref());
700
701 let mut child_acc = ChildAccumulator::new(child_fds);
702
703 let mut status: Option<ExecutionResult> = None;
704 let slow_timeout = test.settings.slow_timeout();
705 let leak_timeout = test.settings.leak_timeout();
706
707 let mut interval_sleep = std::pin::pin!(crate::time::pausable_sleep(slow_timeout.period));
710
711 let mut timeout_hit = 0;
712
713 let mut cx = UnitContext {
714 packet: UnitPacket::Test(test.clone()),
715 slow_after: None,
716 };
717
718 let (res, leaked) = {
719 let res = loop {
720 tokio::select! {
721 () = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
722 res = child.wait() => {
723 break res;
725 }
726 _ = &mut interval_sleep, if status.is_none() => {
727 cx.slow_after = Some(slow_timeout.period);
729
730 timeout_hit += 1;
731 let will_terminate = if let Some(terminate_after) = slow_timeout.terminate_after {
732 NonZeroUsize::new(timeout_hit as usize)
733 .expect("timeout_hit was just incremented")
734 >= terminate_after
735 } else {
736 false
737 };
738
739 if !slow_timeout.grace_period.is_zero() {
740 let _ = resp_tx.send(test.slow_event(
741 timeout_hit * slow_timeout.period,
744 will_terminate.then_some(slow_timeout.grace_period),
745 ));
746 }
747
748 if will_terminate {
749 _ = super::os::terminate_child(
757 &cx,
758 &mut child,
759 &mut child_acc,
760 InternalTerminateReason::Timeout,
761 stopwatch,
762 req_rx,
763 job.as_ref(),
764 slow_timeout.grace_period,
765 ).await;
766 status = Some(ExecutionResult::Timeout);
767 if slow_timeout.grace_period.is_zero() {
768 break child.wait().await;
769 }
770 } else {
772 interval_sleep.as_mut().reset_last_duration();
773 }
774 }
775 recv = req_rx.recv() => {
776 let req = recv.expect("req_rx sender is open");
779
780 match req {
781 RunUnitRequest::Signal(req) => {
782 #[cfg_attr(not(windows), expect(unused_variables))]
783 let res = handle_signal_request(
784 &cx,
785 &mut child,
786 &mut child_acc,
787 req,
788 stopwatch,
789 interval_sleep.as_mut(),
790 req_rx,
791 job.as_ref(),
792 slow_timeout.grace_period,
793 ).await;
794
795 #[cfg(windows)]
808 {
809 if matches!(
810 res,
811 HandleSignalResult::Terminated(super::TerminateChildResult::Killed)
812 ) {
813 status = Some(ExecutionResult::Fail {
814 failure_status: FailureStatus::Abort(
815 crate::reporter::events::AbortStatus::JobObject,
816 ),
817 leaked: false,
818 });
819 }
820 }
821 }
822 RunUnitRequest::OtherCancel => {
823 }
826 RunUnitRequest::Query(RunUnitQuery::GetInfo(tx)) => {
827 _ = tx.send(test.info_response(
828 UnitState::Running {
829 pid: child_pid,
830 time_taken: stopwatch.snapshot().active,
831 slow_after: cx.slow_after,
832 },
833 child_acc.snapshot_in_progress(UnitKind::WAITING_ON_TEST_MESSAGE),
834 ));
835 }
836 }
837 }
838 };
839 };
840
841 let tentative_status = status.or_else(|| {
843 res.as_ref().ok().map(|res| {
844 create_execution_result(*res, &child_acc.errors, false, LeakTimeoutResult::Pass)
845 })
846 });
847
848 let leaked = detect_fd_leaks(
849 &cx,
850 child_pid,
851 &mut child_acc,
852 tentative_status,
853 leak_timeout,
854 stopwatch,
855 req_rx,
856 )
857 .await;
858
859 (res, leaked)
860 };
861
862 let exit_status = match res {
863 Ok(exit_status) => Some(exit_status),
864 Err(err) => {
865 child_acc.errors.push(ChildFdError::Wait(Arc::new(err)));
866 None
867 }
868 };
869
870 let exit_status = exit_status.expect("None always results in early return");
871 let exec_result = status.unwrap_or_else(|| {
872 create_execution_result(exit_status, &child_acc.errors, leaked, leak_timeout.result)
873 });
874
875 Ok(InternalExecuteStatus {
876 test,
877 slow_after: cx.slow_after,
878 output: ChildExecutionOutput::Output {
879 result: Some(exec_result),
880 output: child_acc.output.freeze(),
881 errors: ErrorList::new(UnitKind::WAITING_ON_TEST_MESSAGE, child_acc.errors),
882 },
883 result: exec_result,
884 stopwatch_end: stopwatch.snapshot(),
885 })
886 }
887}
888
889#[derive(Debug)]
890struct BackoffIter {
891 policy: RetryPolicy,
892 current_factor: f64,
893 remaining_attempts: usize,
894}
895
896impl BackoffIter {
897 const BACKOFF_EXPONENT: f64 = 2.;
898
899 fn new(policy: RetryPolicy) -> Self {
900 let remaining_attempts = policy.count();
901 Self {
902 policy,
903 current_factor: 1.,
904 remaining_attempts,
905 }
906 }
907
908 fn next_delay_and_jitter(&mut self) -> (Duration, bool) {
909 match self.policy {
910 RetryPolicy::Fixed { delay, jitter, .. } => (delay, jitter),
911 RetryPolicy::Exponential {
912 delay,
913 jitter,
914 max_delay,
915 ..
916 } => {
917 let factor = self.current_factor;
918 let exp_delay = delay.mul_f64(factor);
919
920 if let Some(max_delay) = max_delay {
922 if exp_delay > max_delay {
923 return (max_delay, jitter);
924 }
925 }
926
927 let next_factor = self.current_factor * Self::BACKOFF_EXPONENT;
928 self.current_factor = next_factor;
929
930 (exp_delay, jitter)
931 }
932 }
933 }
934
935 fn apply_jitter(duration: Duration) -> Duration {
936 let jitter: f64 = rand::rng().sample(OpenClosed01);
937 duration.mul_f64(0.5 + jitter / 2.)
939 }
940}
941
942impl Iterator for BackoffIter {
943 type Item = Duration;
944 fn next(&mut self) -> Option<Self::Item> {
945 if self.remaining_attempts > 0 {
946 let (mut delay, jitter) = self.next_delay_and_jitter();
947 if jitter {
948 delay = Self::apply_jitter(delay);
949 }
950 self.remaining_attempts -= 1;
951 Some(delay)
952 } else {
953 None
954 }
955 }
956}
957
958pub(super) struct UnitContext<'a> {
961 packet: UnitPacket<'a>,
962 slow_after: Option<Duration>,
966}
967
968impl<'a> UnitContext<'a> {
969 pub(super) fn packet(&self) -> &UnitPacket<'a> {
970 &self.packet
971 }
972
973 pub(super) fn info_response(
974 &self,
975 state: UnitState,
976 output: ChildExecutionOutput,
977 ) -> InfoResponse<'a> {
978 match &self.packet {
979 UnitPacket::SetupScript(packet) => packet.info_response(state, output),
980 UnitPacket::Test(packet) => packet.info_response(state, output),
981 }
982 }
983}
984
985#[derive(Clone, Debug)]
986pub(super) enum UnitPacket<'a> {
987 SetupScript(SetupScriptPacket<'a>),
988 Test(TestPacket<'a>),
989}
990
991impl UnitPacket<'_> {
992 pub(super) fn kind(&self) -> UnitKind {
993 match self {
994 Self::SetupScript(_) => UnitKind::Script,
995 Self::Test(_) => UnitKind::Test,
996 }
997 }
998}
999
1000#[derive(Clone)]
1001pub(super) struct TestPacket<'a> {
1002 test_instance: TestInstance<'a>,
1003 cx: FutureQueueContext,
1004 retry_data: RetryData,
1005 settings: Arc<TestSettings<'a>>,
1006 setup_script_data: Arc<SetupScriptExecuteData<'a>>,
1007 delay_before_start: Duration,
1008}
1009
1010impl<'a> TestPacket<'a> {
1011 fn slow_event(&self, elapsed: Duration, will_terminate: Option<Duration>) -> ExecutorEvent<'a> {
1012 ExecutorEvent::Slow {
1013 test_instance: self.test_instance,
1014 retry_data: self.retry_data,
1015 elapsed,
1016 will_terminate,
1017 }
1018 }
1019
1020 pub(super) fn retry_data(&self) -> RetryData {
1021 self.retry_data
1022 }
1023
1024 pub(super) fn delay_before_start(&self) -> Duration {
1025 self.delay_before_start
1026 }
1027
1028 pub(super) fn info_response(
1029 &self,
1030 state: UnitState,
1031 output: ChildExecutionOutput,
1032 ) -> InfoResponse<'a> {
1033 InfoResponse::Test(TestInfoResponse {
1034 test_instance: self.test_instance.id(),
1035 state,
1036 retry_data: self.retry_data,
1037 output,
1038 })
1039 }
1040}
1041
1042impl fmt::Debug for TestPacket<'_> {
1043 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1044 f.debug_struct("TestPacket")
1045 .field("test_instance", &self.test_instance.id())
1046 .field("cx", &self.cx)
1047 .finish_non_exhaustive()
1048 }
1049}
1050
1051#[derive(Clone, Debug)]
1052pub(super) struct SetupScriptPacket<'a> {
1053 script_id: ScriptId,
1054 config: &'a SetupScriptConfig,
1055 program: String,
1056}
1057
1058impl<'a> SetupScriptPacket<'a> {
1059 fn make_command(
1061 &self,
1062 profile_name: &str,
1063 double_spawn: &DoubleSpawnInfo,
1064 test_list: &TestList<'_>,
1065 ) -> Result<SetupScriptCommand, ChildStartError> {
1066 SetupScriptCommand::new(self.config, profile_name, double_spawn, test_list)
1067 }
1068
1069 fn slow_event(&self, elapsed: Duration, will_terminate: Option<Duration>) -> ExecutorEvent<'a> {
1070 ExecutorEvent::SetupScriptSlow {
1071 script_id: self.script_id.clone(),
1072 config: self.config,
1073 program: self.program.clone(),
1074 elapsed,
1075 will_terminate,
1076 }
1077 }
1078
1079 pub(super) fn info_response(
1080 &self,
1081 state: UnitState,
1082 output: ChildExecutionOutput,
1083 ) -> InfoResponse<'a> {
1084 InfoResponse::SetupScript(SetupScriptInfoResponse {
1085 script_id: self.script_id.clone(),
1086 program: self.program.clone(),
1087 args: &self.config.command.args,
1088 state,
1089 output,
1090 })
1091 }
1092}
1093
1094fn drain_req_rx<'a>(
1096 mut receiver: UnboundedReceiver<RunUnitRequest<'a>>,
1097 status: UnitExecuteStatus<'a, '_>,
1098) {
1099 receiver.close();
1101 loop {
1102 let message = receiver.try_recv();
1104 match message {
1105 Ok(message) => {
1106 message.drain(status);
1107 }
1108 Err(_) => {
1109 break;
1110 }
1111 }
1112 }
1113}
1114
1115async fn handle_delay_between_attempts<'a>(
1116 packet: &TestPacket<'a>,
1117 previous_result: ExecutionResult,
1118 previous_slow: bool,
1119 delay: Duration,
1120 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
1121) {
1122 let mut sleep = std::pin::pin!(crate::time::pausable_sleep(delay));
1123 #[cfg_attr(not(unix), expect(unused_mut))]
1124 let mut waiting_stopwatch = crate::time::stopwatch();
1125
1126 loop {
1127 tokio::select! {
1128 _ = &mut sleep => {
1129 break;
1131 }
1132 recv = req_rx.recv() => {
1133 let req = recv.expect("req_rx sender is open");
1134
1135 match req {
1136 #[cfg(unix)]
1137 RunUnitRequest::Signal(SignalRequest::Stop(tx)) => {
1138 sleep.as_mut().pause();
1139 waiting_stopwatch.pause();
1140 _ = tx.send(());
1141 }
1142 #[cfg(unix)]
1143 RunUnitRequest::Signal(SignalRequest::Continue) => {
1144 if sleep.is_paused() {
1145 sleep.as_mut().resume();
1146 waiting_stopwatch.resume();
1147 }
1148 }
1149 RunUnitRequest::Signal(SignalRequest::Shutdown(_)) => {
1150 break;
1153 }
1154 RunUnitRequest::OtherCancel => {
1155 break;
1158 }
1159 RunUnitRequest::Query(RunUnitQuery::GetInfo(tx)) => {
1160 let waiting_snapshot = waiting_stopwatch.snapshot();
1161 _ = tx.send(
1162 packet.info_response(
1163 UnitState::DelayBeforeNextAttempt {
1164 previous_result,
1165 previous_slow,
1166 waiting_duration: waiting_snapshot.active,
1167 remaining: delay
1168 .checked_sub(waiting_snapshot.active)
1169 .unwrap_or_default(),
1170 },
1171 ChildExecutionOutput::Output {
1174 result: None,
1175 output: ChildOutput::Split(ChildSplitOutput {
1176 stdout: None,
1177 stderr: None,
1178 }),
1179 errors: None,
1180 },
1181 ),
1182 );
1183 }
1184 }
1185 }
1186 }
1187 }
1188}
1189
1190async fn detect_fd_leaks<'a>(
1198 cx: &UnitContext<'a>,
1199 child_pid: u32,
1200 child_acc: &mut ChildAccumulator,
1201 tentative_result: Option<ExecutionResult>,
1202 leak_timeout: LeakTimeout,
1203 stopwatch: &mut StopwatchStart,
1204 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
1205) -> bool {
1206 loop {
1207 let mut sleep = std::pin::pin!(tokio::time::sleep(leak_timeout.period));
1210 let waiting_stopwatch = crate::time::stopwatch();
1211
1212 tokio::select! {
1213 () = child_acc.fill_buf(), if !child_acc.fds.is_done() => {}
1217 () = &mut sleep, if !child_acc.fds.is_done() => {
1218 break true;
1219 }
1220 recv = req_rx.recv(), if !child_acc.fds.is_done() => {
1221 let req = recv.expect("a RecvError should never happen here");
1225
1226 match req {
1227 RunUnitRequest::Signal(_) => {
1228 }
1230 RunUnitRequest::OtherCancel => {
1231 }
1234 RunUnitRequest::Query(RunUnitQuery::GetInfo(sender)) => {
1235 let snapshot = waiting_stopwatch.snapshot();
1236 let resp = cx.info_response(
1237 UnitState::Exiting {
1238 pid: child_pid,
1243 time_taken: stopwatch.snapshot().active,
1244 slow_after: cx.slow_after,
1245 tentative_result,
1246 waiting_duration: snapshot.active,
1247 remaining: leak_timeout.period
1248 .checked_sub(snapshot.active)
1249 .unwrap_or_default(),
1250 },
1251 child_acc.snapshot_in_progress(cx.packet.kind().waiting_on_message()),
1252 );
1253
1254 _ = sender.send(resp);
1255 }
1256 }
1257 }
1258 else => {
1259 break false;
1260 }
1261 }
1262 }
1263}
1264
1265#[expect(clippy::too_many_arguments)]
1269async fn handle_signal_request<'a>(
1270 cx: &UnitContext<'a>,
1271 child: &mut Child,
1272 child_acc: &mut ChildAccumulator,
1273 req: SignalRequest,
1274 stopwatch: &mut StopwatchStart,
1275 #[cfg_attr(not(unix), expect(unused_mut, unused_variables))] mut interval_sleep: Pin<
1276 &mut PausableSleep,
1277 >,
1278 req_rx: &mut UnboundedReceiver<RunUnitRequest<'a>>,
1279 job: Option<&super::os::Job>,
1280 grace_period: Duration,
1281) -> HandleSignalResult {
1282 match req {
1283 #[cfg(unix)]
1284 SignalRequest::Stop(sender) => {
1285 stopwatch.pause();
1288 interval_sleep.as_mut().pause();
1289 super::os::job_control_child(child, crate::signal::JobControlEvent::Stop);
1290 let _ = sender.send(());
1293 HandleSignalResult::JobControl
1294 }
1295 #[cfg(unix)]
1296 SignalRequest::Continue => {
1297 if stopwatch.is_paused() {
1300 stopwatch.resume();
1301 interval_sleep.as_mut().resume();
1302 super::os::job_control_child(child, crate::signal::JobControlEvent::Continue);
1303 }
1304 HandleSignalResult::JobControl
1305 }
1306 SignalRequest::Shutdown(event) => {
1307 let res = super::os::terminate_child(
1308 cx,
1309 child,
1310 child_acc,
1311 InternalTerminateReason::Signal(event),
1312 stopwatch,
1313 req_rx,
1314 job,
1315 grace_period,
1316 )
1317 .await;
1318 HandleSignalResult::Terminated(res)
1319 }
1320 }
1321}
1322
1323fn create_execution_result(
1324 exit_status: ExitStatus,
1325 child_errors: &[ChildFdError],
1326 leaked: bool,
1327 leak_timeout_result: LeakTimeoutResult,
1328) -> ExecutionResult {
1329 if !child_errors.is_empty() {
1330 ExecutionResult::ExecFail
1333 } else if exit_status.success() {
1334 if leaked {
1335 ExecutionResult::Leak {
1338 result: leak_timeout_result,
1339 }
1340 } else {
1341 ExecutionResult::Pass
1342 }
1343 } else {
1344 ExecutionResult::Fail {
1345 failure_status: FailureStatus::extract(exit_status),
1346 leaked,
1347 }
1348 }
1349}