nextest_runner/runner/
internal_events.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Internal events used between the runner components.
5//!
6//! These events often mirror those in [`crate::reporter::events`], but are used
7//! within the runner. They'll often carry additional information that the
8//! reporter doesn't need to know about.
9
10use super::{SetupScriptPacket, TestPacket};
11use crate::{
12    config::scripts::{ScriptId, SetupScriptConfig},
13    errors::DisplayErrorChain,
14    list::TestInstance,
15    reporter::{
16        TestOutputDisplay, UnitErrorDescription,
17        events::{
18            ChildExecutionOutputDescription, ErrorSummary, ExecuteStatus, ExecutionResult,
19            InfoResponse, OutputErrorSlice, RetryData, SetupScriptEnvMap, SetupScriptExecuteStatus,
20            StressIndex, UnitKind, UnitState,
21        },
22    },
23    signal::ShutdownEvent,
24    test_output::{ChildExecutionOutput, ChildSingleOutput},
25    time::StopwatchSnapshot,
26};
27use nextest_metadata::MismatchReason;
28use std::time::Duration;
29use tokio::{
30    sync::{
31        mpsc::{UnboundedReceiver, UnboundedSender},
32        oneshot,
33    },
34    task::JoinError,
35};
36
37/// An internal event.
38///
39/// These events are sent by the executor (the part that actually runs
40/// executables) to the dispatcher (the part of the runner that coordinates with
41/// the external world).
42#[derive(Debug)]
43pub(super) enum ExecutorEvent<'a> {
44    SetupScriptStarted {
45        stress_index: Option<StressIndex>,
46        script_id: ScriptId,
47        config: &'a SetupScriptConfig,
48        program: String,
49        index: usize,
50        total: usize,
51        // See the note in the `Started` variant.
52        req_rx_tx: oneshot::Sender<UnboundedReceiver<RunUnitRequest<'a>>>,
53    },
54    SetupScriptSlow {
55        stress_index: Option<StressIndex>,
56        script_id: ScriptId,
57        config: &'a SetupScriptConfig,
58        program: String,
59        elapsed: Duration,
60        will_terminate: Option<Duration>,
61    },
62    SetupScriptFinished {
63        stress_index: Option<StressIndex>,
64        script_id: ScriptId,
65        config: &'a SetupScriptConfig,
66        program: String,
67        index: usize,
68        total: usize,
69        status: SetupScriptExecuteStatus<ChildSingleOutput>,
70    },
71    Started {
72        stress_index: Option<StressIndex>,
73        test_instance: TestInstance<'a>,
74        command_line: Vec<String>,
75        // The channel over which to return the unit request.
76        //
77        // The callback context is solely responsible for coordinating the
78        // creation of all channels, such that it acts as the source of truth
79        // for which units to broadcast messages out to. This oneshot channel is
80        // used to let each test instance know to go ahead and start running
81        // tests.
82        //
83        // Why do we use unbounded channels? Mostly to make life simpler --
84        // these are low-traffic channels that we don't expect to be backed up.
85        req_rx_tx: oneshot::Sender<UnboundedReceiver<RunUnitRequest<'a>>>,
86    },
87    Slow {
88        stress_index: Option<StressIndex>,
89        test_instance: TestInstance<'a>,
90        retry_data: RetryData,
91        elapsed: Duration,
92        will_terminate: Option<Duration>,
93    },
94    AttemptFailedWillRetry {
95        stress_index: Option<StressIndex>,
96        test_instance: TestInstance<'a>,
97        failure_output: TestOutputDisplay,
98        run_status: ExecuteStatus<ChildSingleOutput>,
99        delay_before_next_attempt: Duration,
100    },
101    RetryStarted {
102        stress_index: Option<StressIndex>,
103        test_instance: TestInstance<'a>,
104        retry_data: RetryData,
105        command_line: Vec<String>,
106        // This is used to indicate that the dispatcher still wants to run the test.
107        tx: oneshot::Sender<()>,
108    },
109    Finished {
110        stress_index: Option<StressIndex>,
111        test_instance: TestInstance<'a>,
112        success_output: TestOutputDisplay,
113        failure_output: TestOutputDisplay,
114        junit_store_success_output: bool,
115        junit_store_failure_output: bool,
116        last_run_status: ExecuteStatus<ChildSingleOutput>,
117    },
118    Skipped {
119        stress_index: Option<StressIndex>,
120        test_instance: TestInstance<'a>,
121        reason: MismatchReason,
122    },
123}
124
125#[derive(Clone, Copy)]
126pub(super) enum UnitExecuteStatus<'a, 'status> {
127    Test(&'status InternalExecuteStatus<'a>),
128    SetupScript(&'status InternalSetupScriptExecuteStatus<'a>),
129}
130
131impl<'a> UnitExecuteStatus<'a, '_> {
132    pub(super) fn info_response(&self) -> InfoResponse<'a> {
133        match self {
134            Self::Test(status) => status.test.info_response(
135                UnitState::Exited {
136                    result: status.result,
137                    time_taken: status.stopwatch_end.active,
138                    slow_after: status.slow_after,
139                },
140                status.output.clone(),
141            ),
142            Self::SetupScript(status) => status.script.info_response(
143                UnitState::Exited {
144                    result: status.result,
145                    time_taken: status.stopwatch_end.active,
146                    slow_after: status.slow_after,
147                },
148                status.output.clone(),
149            ),
150        }
151    }
152}
153
154pub(super) struct InternalExecuteStatus<'a> {
155    pub(super) test: TestPacket<'a>,
156    pub(super) slow_after: Option<Duration>,
157    pub(super) output: ChildExecutionOutput,
158    pub(super) result: ExecutionResult,
159    pub(super) stopwatch_end: StopwatchSnapshot,
160}
161
162impl InternalExecuteStatus<'_> {
163    pub(super) fn into_external(self) -> ExecuteStatus<ChildSingleOutput> {
164        let output: ChildExecutionOutputDescription<ChildSingleOutput> = self.output.into();
165
166        // Compute the error summary and output error slice using
167        // UnitErrorDescription.
168        let desc = UnitErrorDescription::new(UnitKind::Test, &output);
169        let error_summary = desc.all_error_list().map(|errors| ErrorSummary {
170            short_message: errors.short_message(),
171            description: DisplayErrorChain::new(errors).to_string(),
172        });
173        let output_error_slice = desc.output_slice().map(|slice| OutputErrorSlice {
174            slice: slice.to_string(),
175            start: slice.combined_subslice().map(|s| s.start).unwrap_or(0),
176        });
177
178        ExecuteStatus {
179            retry_data: self.test.retry_data(),
180            output,
181            result: self.result.into(),
182            start_time: self.stopwatch_end.start_time.fixed_offset(),
183            time_taken: self.stopwatch_end.active,
184            is_slow: self.slow_after.is_some(),
185            delay_before_start: self.test.delay_before_start(),
186            error_summary,
187            output_error_slice,
188        }
189    }
190}
191
192pub(super) struct InternalSetupScriptExecuteStatus<'a> {
193    pub(super) script: SetupScriptPacket<'a>,
194    pub(super) slow_after: Option<Duration>,
195    pub(super) output: ChildExecutionOutput,
196    pub(super) result: ExecutionResult,
197    pub(super) stopwatch_end: StopwatchSnapshot,
198    pub(super) env_map: Option<SetupScriptEnvMap>,
199}
200
201impl InternalSetupScriptExecuteStatus<'_> {
202    pub(super) fn into_external(self) -> SetupScriptExecuteStatus<ChildSingleOutput> {
203        let output: ChildExecutionOutputDescription<ChildSingleOutput> = self.output.into();
204
205        // Compute the error summary using UnitErrorDescription.
206        // Setup scripts don't have output_error_slice since that's only for
207        // tests (setup scripts can fail in all kinds of ways, while tests fail
208        // in more predictable ones).
209        let desc = UnitErrorDescription::new(UnitKind::Script, &output);
210        let error_summary = desc.all_error_list().map(|errors| ErrorSummary {
211            short_message: errors.short_message(),
212            description: DisplayErrorChain::new(errors).to_string(),
213        });
214
215        SetupScriptExecuteStatus {
216            output,
217            result: self.result.into(),
218            start_time: self.stopwatch_end.start_time.fixed_offset(),
219            time_taken: self.stopwatch_end.active,
220            is_slow: self.slow_after.is_some(),
221            env_map: self.env_map,
222            error_summary,
223        }
224    }
225}
226
227/// Events sent from the dispatcher to individual unit execution tasks.
228#[derive(Clone, Debug)]
229pub(super) enum RunUnitRequest<'a> {
230    Signal(SignalRequest),
231    /// Non-signal cancellation requests (e.g. test failures) which should cause
232    /// tests to exit in some states.
233    OtherCancel,
234    Query(RunUnitQuery<'a>),
235}
236
237impl<'a> RunUnitRequest<'a> {
238    pub(super) fn drain(self, status: UnitExecuteStatus<'a, '_>) {
239        match self {
240            #[cfg(unix)]
241            Self::Signal(SignalRequest::Stop(sender)) => {
242                // The receiver being dead isn't really important.
243                let _ = sender.send(());
244            }
245            #[cfg(unix)]
246            Self::Signal(SignalRequest::Continue) => {}
247            Self::Signal(SignalRequest::Shutdown(_)) => {}
248            Self::OtherCancel => {}
249            Self::Query(RunUnitQuery::GetInfo(tx)) => {
250                // The receiver being dead isn't really important.
251                _ = tx.send(status.info_response());
252            }
253        }
254    }
255}
256
257#[derive(Clone, Debug)]
258pub(super) enum SignalRequest {
259    // The mpsc sender is used by each test to indicate that the stop signal has been sent.
260    #[cfg(unix)]
261    Stop(UnboundedSender<()>),
262    #[cfg(unix)]
263    Continue,
264    Shutdown(ShutdownRequest),
265}
266
267#[derive(Copy, Clone, Debug, Eq, PartialEq)]
268pub(super) enum ShutdownRequest {
269    Once(ShutdownEvent),
270    Twice,
271}
272
273#[derive(Clone, Debug)]
274pub(super) enum RunUnitQuery<'a> {
275    GetInfo(UnboundedSender<InfoResponse<'a>>),
276}
277
278#[derive(Clone, Copy, Debug, Eq, PartialEq)]
279pub(super) enum InternalTerminateReason {
280    Timeout,
281    Signal(ShutdownRequest),
282}
283
284pub(super) enum RunnerTaskState {
285    Finished { child_join_errors: Vec<JoinError> },
286    Cancelled,
287}
288
289impl RunnerTaskState {
290    /// Mark a runner task as finished and having not run any children.
291    pub(super) fn finished_no_children() -> Self {
292        Self::Finished {
293            child_join_errors: Vec::new(),
294        }
295    }
296}
297
298#[derive(Clone, Copy, Debug)]
299#[must_use]
300pub(super) enum HandleSignalResult {
301    /// A job control signal was delivered.
302    #[cfg(unix)]
303    JobControl,
304
305    /// The child was terminated.
306    #[cfg_attr(not(windows), expect(dead_code))]
307    Terminated(TerminateChildResult),
308}
309
310#[derive(Clone, Copy, Debug)]
311#[must_use]
312pub(super) enum TerminateChildResult {
313    /// The child process exited without being forcibly killed.
314    Exited,
315
316    /// The child process was forcibly killed.
317    Killed,
318}