nextest_runner/runner/
internal_events.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Internal events used between the runner components.
5//!
6//! These events often mirror those in [`crate::reporter::events`], but are used
7//! within the runner. They'll often carry additional information that the
8//! reporter doesn't need to know about.
9
10use super::{SetupScriptPacket, TestPacket};
11use crate::{
12    config::{ScriptId, SetupScriptConfig},
13    list::TestInstance,
14    reporter::{
15        TestOutputDisplay,
16        events::{
17            ExecuteStatus, ExecutionResult, InfoResponse, RetryData, SetupScriptEnvMap,
18            SetupScriptExecuteStatus, UnitState,
19        },
20    },
21    signal::ShutdownEvent,
22    test_output::ChildExecutionOutput,
23    time::StopwatchSnapshot,
24};
25use nextest_metadata::MismatchReason;
26use std::time::Duration;
27use tokio::{
28    sync::{
29        mpsc::{UnboundedReceiver, UnboundedSender},
30        oneshot,
31    },
32    task::JoinError,
33};
34
35/// An internal event.
36///
37/// These events are sent by the executor (the part that actually runs
38/// executables) to the dispatcher (the part of the runner that coordinates with
39/// the external world).
40#[derive(Debug)]
41pub(super) enum ExecutorEvent<'a> {
42    SetupScriptStarted {
43        script_id: ScriptId,
44        config: &'a SetupScriptConfig,
45        program: String,
46        index: usize,
47        total: usize,
48        // See the note in the `Started` variant.
49        req_rx_tx: oneshot::Sender<UnboundedReceiver<RunUnitRequest<'a>>>,
50    },
51    SetupScriptSlow {
52        script_id: ScriptId,
53        config: &'a SetupScriptConfig,
54        program: String,
55        elapsed: Duration,
56        will_terminate: Option<Duration>,
57    },
58    SetupScriptFinished {
59        script_id: ScriptId,
60        config: &'a SetupScriptConfig,
61        program: String,
62        index: usize,
63        total: usize,
64        status: SetupScriptExecuteStatus,
65    },
66    Started {
67        test_instance: TestInstance<'a>,
68        // The channel over which to return the unit request.
69        //
70        // The callback context is solely responsible for coordinating the
71        // creation of all channels, such that it acts as the source of truth
72        // for which units to broadcast messages out to. This oneshot channel is
73        // used to let each test instance know to go ahead and start running
74        // tests.
75        //
76        // Why do we use unbounded channels? Mostly to make life simpler --
77        // these are low-traffic channels that we don't expect to be backed up.
78        req_rx_tx: oneshot::Sender<UnboundedReceiver<RunUnitRequest<'a>>>,
79    },
80    Slow {
81        test_instance: TestInstance<'a>,
82        retry_data: RetryData,
83        elapsed: Duration,
84        will_terminate: Option<Duration>,
85    },
86    AttemptFailedWillRetry {
87        test_instance: TestInstance<'a>,
88        failure_output: TestOutputDisplay,
89        run_status: ExecuteStatus,
90        delay_before_next_attempt: Duration,
91    },
92    RetryStarted {
93        test_instance: TestInstance<'a>,
94        retry_data: RetryData,
95        // This is used to indicate that the dispatcher still wants to run the test.
96        tx: oneshot::Sender<()>,
97    },
98    Finished {
99        test_instance: TestInstance<'a>,
100        success_output: TestOutputDisplay,
101        failure_output: TestOutputDisplay,
102        junit_store_success_output: bool,
103        junit_store_failure_output: bool,
104        last_run_status: ExecuteStatus,
105    },
106    Skipped {
107        test_instance: TestInstance<'a>,
108        reason: MismatchReason,
109    },
110}
111
112#[derive(Clone, Copy)]
113pub(super) enum UnitExecuteStatus<'a, 'status> {
114    Test(&'status InternalExecuteStatus<'a>),
115    SetupScript(&'status InternalSetupScriptExecuteStatus<'a>),
116}
117
118impl<'a> UnitExecuteStatus<'a, '_> {
119    pub(super) fn info_response(&self) -> InfoResponse<'a> {
120        match self {
121            Self::Test(status) => status.test.info_response(
122                UnitState::Exited {
123                    result: status.result,
124                    time_taken: status.stopwatch_end.active,
125                    slow_after: status.slow_after,
126                },
127                status.output.clone(),
128            ),
129            Self::SetupScript(status) => status.script.info_response(
130                UnitState::Exited {
131                    result: status.result,
132                    time_taken: status.stopwatch_end.active,
133                    slow_after: status.slow_after,
134                },
135                status.output.clone(),
136            ),
137        }
138    }
139}
140
141pub(super) struct InternalExecuteStatus<'a> {
142    pub(super) test: TestPacket<'a>,
143    pub(super) slow_after: Option<Duration>,
144    pub(super) output: ChildExecutionOutput,
145    pub(super) result: ExecutionResult,
146    pub(super) stopwatch_end: StopwatchSnapshot,
147}
148
149impl InternalExecuteStatus<'_> {
150    pub(super) fn into_external(self) -> ExecuteStatus {
151        ExecuteStatus {
152            retry_data: self.test.retry_data(),
153            output: self.output,
154            result: self.result,
155            start_time: self.stopwatch_end.start_time.fixed_offset(),
156            time_taken: self.stopwatch_end.active,
157            is_slow: self.slow_after.is_some(),
158            delay_before_start: self.test.delay_before_start(),
159        }
160    }
161}
162
163pub(super) struct InternalSetupScriptExecuteStatus<'a> {
164    pub(super) script: SetupScriptPacket<'a>,
165    pub(super) slow_after: Option<Duration>,
166    pub(super) output: ChildExecutionOutput,
167    pub(super) result: ExecutionResult,
168    pub(super) stopwatch_end: StopwatchSnapshot,
169    pub(super) env_map: Option<SetupScriptEnvMap>,
170}
171
172impl InternalSetupScriptExecuteStatus<'_> {
173    pub(super) fn into_external(self) -> SetupScriptExecuteStatus {
174        SetupScriptExecuteStatus {
175            output: self.output,
176            result: self.result,
177            start_time: self.stopwatch_end.start_time.fixed_offset(),
178            time_taken: self.stopwatch_end.active,
179            is_slow: self.slow_after.is_some(),
180            env_map: self.env_map,
181        }
182    }
183}
184
185/// Events sent from the dispatcher to individual unit execution tasks.
186#[derive(Clone, Debug)]
187pub(super) enum RunUnitRequest<'a> {
188    Signal(SignalRequest),
189    /// Non-signal cancellation requests (e.g. test failures) which should cause
190    /// tests to exit in some states.
191    OtherCancel,
192    Query(RunUnitQuery<'a>),
193}
194
195impl<'a> RunUnitRequest<'a> {
196    pub(super) fn drain(self, status: UnitExecuteStatus<'a, '_>) {
197        match self {
198            #[cfg(unix)]
199            Self::Signal(SignalRequest::Stop(sender)) => {
200                // The receiver being dead isn't really important.
201                let _ = sender.send(());
202            }
203            #[cfg(unix)]
204            Self::Signal(SignalRequest::Continue) => {}
205            Self::Signal(SignalRequest::Shutdown(_)) => {}
206            Self::OtherCancel => {}
207            Self::Query(RunUnitQuery::GetInfo(tx)) => {
208                // The receiver being dead isn't really important.
209                _ = tx.send(status.info_response());
210            }
211        }
212    }
213}
214
215#[derive(Clone, Debug)]
216pub(super) enum SignalRequest {
217    // The mpsc sender is used by each test to indicate that the stop signal has been sent.
218    #[cfg(unix)]
219    Stop(UnboundedSender<()>),
220    #[cfg(unix)]
221    Continue,
222    Shutdown(ShutdownRequest),
223}
224
225#[derive(Copy, Clone, Debug, Eq, PartialEq)]
226pub(super) enum ShutdownRequest {
227    Once(ShutdownEvent),
228    Twice,
229}
230
231#[derive(Clone, Debug)]
232pub(super) enum RunUnitQuery<'a> {
233    GetInfo(UnboundedSender<InfoResponse<'a>>),
234}
235
236#[derive(Clone, Copy, Debug, Eq, PartialEq)]
237pub(super) enum InternalTerminateReason {
238    Timeout,
239    Signal(ShutdownRequest),
240}
241
242pub(super) enum RunnerTaskState {
243    Finished { child_join_errors: Vec<JoinError> },
244    Cancelled,
245}
246
247impl RunnerTaskState {
248    /// Mark a runner task as finished and having not run any children.
249    pub(super) fn finished_no_children() -> Self {
250        Self::Finished {
251            child_join_errors: Vec::new(),
252        }
253    }
254}
255
256#[derive(Clone, Copy, Debug)]
257#[must_use]
258pub(super) enum HandleSignalResult {
259    /// A job control signal was delivered.
260    #[cfg(unix)]
261    JobControl,
262
263    /// The child was terminated.
264    #[cfg_attr(not(windows), expect(dead_code))]
265    Terminated(TerminateChildResult),
266}
267
268#[derive(Clone, Copy, Debug)]
269#[must_use]
270pub(super) enum TerminateChildResult {
271    /// The child process exited without being forcibly killed.
272    Exited,
273
274    /// The child process was forcibly killed.
275    Killed,
276}