nextest_runner/runner/
internal_events.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Internal events used between the runner components.
5//!
6//! These events often mirror those in [`crate::reporter::events`], but are used
7//! within the runner. They'll often carry additional information that the
8//! reporter doesn't need to know about.
9
10use super::{SetupScriptPacket, TestPacket};
11use crate::{
12    config::scripts::{ScriptId, SetupScriptConfig},
13    list::TestInstance,
14    reporter::{
15        TestOutputDisplay,
16        events::{
17            ExecuteStatus, ExecutionResult, InfoResponse, RetryData, SetupScriptEnvMap,
18            SetupScriptExecuteStatus, StressIndex, UnitState,
19        },
20    },
21    signal::ShutdownEvent,
22    test_output::ChildExecutionOutput,
23    time::StopwatchSnapshot,
24};
25use nextest_metadata::MismatchReason;
26use std::time::Duration;
27use tokio::{
28    sync::{
29        mpsc::{UnboundedReceiver, UnboundedSender},
30        oneshot,
31    },
32    task::JoinError,
33};
34
35/// An internal event.
36///
37/// These events are sent by the executor (the part that actually runs
38/// executables) to the dispatcher (the part of the runner that coordinates with
39/// the external world).
40#[derive(Debug)]
41pub(super) enum ExecutorEvent<'a> {
42    SetupScriptStarted {
43        stress_index: Option<StressIndex>,
44        script_id: ScriptId,
45        config: &'a SetupScriptConfig,
46        program: String,
47        index: usize,
48        total: usize,
49        // See the note in the `Started` variant.
50        req_rx_tx: oneshot::Sender<UnboundedReceiver<RunUnitRequest<'a>>>,
51    },
52    SetupScriptSlow {
53        stress_index: Option<StressIndex>,
54        script_id: ScriptId,
55        config: &'a SetupScriptConfig,
56        program: String,
57        elapsed: Duration,
58        will_terminate: Option<Duration>,
59    },
60    SetupScriptFinished {
61        stress_index: Option<StressIndex>,
62        script_id: ScriptId,
63        config: &'a SetupScriptConfig,
64        program: String,
65        index: usize,
66        total: usize,
67        status: SetupScriptExecuteStatus,
68    },
69    Started {
70        stress_index: Option<StressIndex>,
71        test_instance: TestInstance<'a>,
72        // The channel over which to return the unit request.
73        //
74        // The callback context is solely responsible for coordinating the
75        // creation of all channels, such that it acts as the source of truth
76        // for which units to broadcast messages out to. This oneshot channel is
77        // used to let each test instance know to go ahead and start running
78        // tests.
79        //
80        // Why do we use unbounded channels? Mostly to make life simpler --
81        // these are low-traffic channels that we don't expect to be backed up.
82        req_rx_tx: oneshot::Sender<UnboundedReceiver<RunUnitRequest<'a>>>,
83    },
84    Slow {
85        stress_index: Option<StressIndex>,
86        test_instance: TestInstance<'a>,
87        retry_data: RetryData,
88        elapsed: Duration,
89        will_terminate: Option<Duration>,
90    },
91    AttemptFailedWillRetry {
92        stress_index: Option<StressIndex>,
93        test_instance: TestInstance<'a>,
94        failure_output: TestOutputDisplay,
95        run_status: ExecuteStatus,
96        delay_before_next_attempt: Duration,
97    },
98    RetryStarted {
99        stress_index: Option<StressIndex>,
100        test_instance: TestInstance<'a>,
101        retry_data: RetryData,
102        // This is used to indicate that the dispatcher still wants to run the test.
103        tx: oneshot::Sender<()>,
104    },
105    Finished {
106        stress_index: Option<StressIndex>,
107        test_instance: TestInstance<'a>,
108        success_output: TestOutputDisplay,
109        failure_output: TestOutputDisplay,
110        junit_store_success_output: bool,
111        junit_store_failure_output: bool,
112        last_run_status: ExecuteStatus,
113    },
114    Skipped {
115        stress_index: Option<StressIndex>,
116        test_instance: TestInstance<'a>,
117        reason: MismatchReason,
118    },
119}
120
121#[derive(Clone, Copy)]
122pub(super) enum UnitExecuteStatus<'a, 'status> {
123    Test(&'status InternalExecuteStatus<'a>),
124    SetupScript(&'status InternalSetupScriptExecuteStatus<'a>),
125}
126
127impl<'a> UnitExecuteStatus<'a, '_> {
128    pub(super) fn info_response(&self) -> InfoResponse<'a> {
129        match self {
130            Self::Test(status) => status.test.info_response(
131                UnitState::Exited {
132                    result: status.result,
133                    time_taken: status.stopwatch_end.active,
134                    slow_after: status.slow_after,
135                },
136                status.output.clone(),
137            ),
138            Self::SetupScript(status) => status.script.info_response(
139                UnitState::Exited {
140                    result: status.result,
141                    time_taken: status.stopwatch_end.active,
142                    slow_after: status.slow_after,
143                },
144                status.output.clone(),
145            ),
146        }
147    }
148}
149
150pub(super) struct InternalExecuteStatus<'a> {
151    pub(super) test: TestPacket<'a>,
152    pub(super) slow_after: Option<Duration>,
153    pub(super) output: ChildExecutionOutput,
154    pub(super) result: ExecutionResult,
155    pub(super) stopwatch_end: StopwatchSnapshot,
156}
157
158impl InternalExecuteStatus<'_> {
159    pub(super) fn into_external(self) -> ExecuteStatus {
160        ExecuteStatus {
161            retry_data: self.test.retry_data(),
162            output: self.output,
163            result: self.result,
164            start_time: self.stopwatch_end.start_time.fixed_offset(),
165            time_taken: self.stopwatch_end.active,
166            is_slow: self.slow_after.is_some(),
167            delay_before_start: self.test.delay_before_start(),
168        }
169    }
170}
171
172pub(super) struct InternalSetupScriptExecuteStatus<'a> {
173    pub(super) script: SetupScriptPacket<'a>,
174    pub(super) slow_after: Option<Duration>,
175    pub(super) output: ChildExecutionOutput,
176    pub(super) result: ExecutionResult,
177    pub(super) stopwatch_end: StopwatchSnapshot,
178    pub(super) env_map: Option<SetupScriptEnvMap>,
179}
180
181impl InternalSetupScriptExecuteStatus<'_> {
182    pub(super) fn into_external(self) -> SetupScriptExecuteStatus {
183        SetupScriptExecuteStatus {
184            output: self.output,
185            result: self.result,
186            start_time: self.stopwatch_end.start_time.fixed_offset(),
187            time_taken: self.stopwatch_end.active,
188            is_slow: self.slow_after.is_some(),
189            env_map: self.env_map,
190        }
191    }
192}
193
194/// Events sent from the dispatcher to individual unit execution tasks.
195#[derive(Clone, Debug)]
196pub(super) enum RunUnitRequest<'a> {
197    Signal(SignalRequest),
198    /// Non-signal cancellation requests (e.g. test failures) which should cause
199    /// tests to exit in some states.
200    OtherCancel,
201    Query(RunUnitQuery<'a>),
202}
203
204impl<'a> RunUnitRequest<'a> {
205    pub(super) fn drain(self, status: UnitExecuteStatus<'a, '_>) {
206        match self {
207            #[cfg(unix)]
208            Self::Signal(SignalRequest::Stop(sender)) => {
209                // The receiver being dead isn't really important.
210                let _ = sender.send(());
211            }
212            #[cfg(unix)]
213            Self::Signal(SignalRequest::Continue) => {}
214            Self::Signal(SignalRequest::Shutdown(_)) => {}
215            Self::OtherCancel => {}
216            Self::Query(RunUnitQuery::GetInfo(tx)) => {
217                // The receiver being dead isn't really important.
218                _ = tx.send(status.info_response());
219            }
220        }
221    }
222}
223
224#[derive(Clone, Debug)]
225pub(super) enum SignalRequest {
226    // The mpsc sender is used by each test to indicate that the stop signal has been sent.
227    #[cfg(unix)]
228    Stop(UnboundedSender<()>),
229    #[cfg(unix)]
230    Continue,
231    Shutdown(ShutdownRequest),
232}
233
234#[derive(Copy, Clone, Debug, Eq, PartialEq)]
235pub(super) enum ShutdownRequest {
236    Once(ShutdownEvent),
237    Twice,
238}
239
240#[derive(Clone, Debug)]
241pub(super) enum RunUnitQuery<'a> {
242    GetInfo(UnboundedSender<InfoResponse<'a>>),
243}
244
245#[derive(Clone, Copy, Debug, Eq, PartialEq)]
246pub(super) enum InternalTerminateReason {
247    Timeout,
248    Signal(ShutdownRequest),
249}
250
251pub(super) enum RunnerTaskState {
252    Finished { child_join_errors: Vec<JoinError> },
253    Cancelled,
254}
255
256impl RunnerTaskState {
257    /// Mark a runner task as finished and having not run any children.
258    pub(super) fn finished_no_children() -> Self {
259        Self::Finished {
260            child_join_errors: Vec::new(),
261        }
262    }
263}
264
265#[derive(Clone, Copy, Debug)]
266#[must_use]
267pub(super) enum HandleSignalResult {
268    /// A job control signal was delivered.
269    #[cfg(unix)]
270    JobControl,
271
272    /// The child was terminated.
273    #[cfg_attr(not(windows), expect(dead_code))]
274    Terminated(TerminateChildResult),
275}
276
277#[derive(Clone, Copy, Debug)]
278#[must_use]
279pub(super) enum TerminateChildResult {
280    /// The child process exited without being forcibly killed.
281    Exited,
282
283    /// The child process was forcibly killed.
284    Killed,
285}