Skip to main content

nextest_runner/runner/
internal_events.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Internal events used between the runner components.
5//!
6//! These events often mirror those in [`crate::reporter::events`], but are used
7//! within the runner. They'll often carry additional information that the
8//! reporter doesn't need to know about.
9
10use super::{SetupScriptPacket, TestPacket};
11use crate::{
12    config::{
13        elements::{FlakyResult, JunitFlakyFailStatus},
14        scripts::{ScriptId, SetupScriptConfig},
15    },
16    errors::DisplayErrorChain,
17    list::TestInstance,
18    output_spec::LiveSpec,
19    reporter::{
20        TestOutputDisplay, UnitErrorDescription,
21        events::{
22            ChildExecutionOutputDescription, ErrorSummary, ExecuteStatus, ExecutionResult,
23            ExecutionResultDescription, InfoResponse, OutputErrorSlice, RetryData,
24            SetupScriptEnvMap, SetupScriptExecuteStatus, StressIndex, TestSlotAssignment, UnitKind,
25            UnitState,
26        },
27    },
28    signal::ShutdownEvent,
29    test_output::ChildExecutionOutput,
30    time::StopwatchSnapshot,
31};
32use nextest_metadata::MismatchReason;
33use std::time::Duration;
34use tokio::{
35    sync::{
36        mpsc::{UnboundedReceiver, UnboundedSender},
37        oneshot,
38    },
39    task::JoinError,
40};
41
42/// An internal event.
43///
44/// These events are sent by the executor (the part that actually runs
45/// executables) to the dispatcher (the part of the runner that coordinates with
46/// the external world).
47#[derive(Debug)]
48pub(super) enum ExecutorEvent<'a> {
49    SetupScriptStarted {
50        stress_index: Option<StressIndex>,
51        script_id: ScriptId,
52        config: &'a SetupScriptConfig,
53        program: String,
54        index: usize,
55        total: usize,
56        // See the note in the `Started` variant.
57        req_rx_tx: oneshot::Sender<UnboundedReceiver<RunUnitRequest<'a>>>,
58    },
59    SetupScriptSlow {
60        stress_index: Option<StressIndex>,
61        script_id: ScriptId,
62        config: &'a SetupScriptConfig,
63        program: String,
64        elapsed: Duration,
65        will_terminate: Option<Duration>,
66    },
67    SetupScriptFinished {
68        stress_index: Option<StressIndex>,
69        script_id: ScriptId,
70        config: &'a SetupScriptConfig,
71        program: String,
72        index: usize,
73        total: usize,
74        status: SetupScriptExecuteStatus<LiveSpec>,
75    },
76    Started {
77        stress_index: Option<StressIndex>,
78        test_instance: TestInstance<'a>,
79        slot_assignment: TestSlotAssignment,
80        command_line: Vec<String>,
81        // The channel over which to return the unit request.
82        //
83        // The callback context is solely responsible for coordinating the
84        // creation of all channels, such that it acts as the source of truth
85        // for which units to broadcast messages out to. This oneshot channel is
86        // used to let each test instance know to go ahead and start running
87        // tests.
88        //
89        // Why do we use unbounded channels? Mostly to make life simpler --
90        // these are low-traffic channels that we don't expect to be backed up.
91        req_rx_tx: oneshot::Sender<UnboundedReceiver<RunUnitRequest<'a>>>,
92        // The configured result for flaky tests.
93        flaky_result: FlakyResult,
94    },
95    Slow {
96        stress_index: Option<StressIndex>,
97        test_instance: TestInstance<'a>,
98        retry_data: RetryData,
99        elapsed: Duration,
100        will_terminate: Option<Duration>,
101    },
102    AttemptFailedWillRetry {
103        stress_index: Option<StressIndex>,
104        test_instance: TestInstance<'a>,
105        failure_output: TestOutputDisplay,
106        run_status: ExecuteStatus<LiveSpec>,
107        delay_before_next_attempt: Duration,
108    },
109    RetryStarted {
110        stress_index: Option<StressIndex>,
111        test_instance: TestInstance<'a>,
112        slot_assignment: TestSlotAssignment,
113        retry_data: RetryData,
114        command_line: Vec<String>,
115        // This is used to indicate that the dispatcher still wants to run the test.
116        tx: oneshot::Sender<()>,
117    },
118    Finished {
119        stress_index: Option<StressIndex>,
120        test_instance: TestInstance<'a>,
121        success_output: TestOutputDisplay,
122        failure_output: TestOutputDisplay,
123        junit_store_success_output: bool,
124        junit_store_failure_output: bool,
125        junit_flaky_fail_status: JunitFlakyFailStatus,
126        last_run_status: ExecuteStatus<LiveSpec>,
127    },
128    Skipped {
129        stress_index: Option<StressIndex>,
130        test_instance: TestInstance<'a>,
131        reason: MismatchReason,
132    },
133}
134
135#[derive(Clone, Copy)]
136pub(super) enum UnitExecuteStatus<'a, 'status> {
137    Test(&'status InternalExecuteStatus<'a>),
138    SetupScript(&'status InternalSetupScriptExecuteStatus<'a>),
139}
140
141impl<'a> UnitExecuteStatus<'a, '_> {
142    pub(super) fn info_response(&self) -> InfoResponse<'a> {
143        match self {
144            Self::Test(status) => status.test.info_response(
145                UnitState::Exited {
146                    result: ExecutionResultDescription::from(status.result),
147                    time_taken: status.stopwatch_end.active,
148                    slow_after: status.slow_after,
149                },
150                status.output.clone(),
151            ),
152            Self::SetupScript(status) => status.script.info_response(
153                UnitState::Exited {
154                    result: ExecutionResultDescription::from(status.result),
155                    time_taken: status.stopwatch_end.active,
156                    slow_after: status.slow_after,
157                },
158                status.output.clone(),
159            ),
160        }
161    }
162}
163
164pub(super) struct InternalExecuteStatus<'a> {
165    pub(super) test: TestPacket<'a>,
166    pub(super) slow_after: Option<Duration>,
167    pub(super) output: ChildExecutionOutput,
168    pub(super) result: ExecutionResult,
169    pub(super) stopwatch_end: StopwatchSnapshot,
170}
171
172impl InternalExecuteStatus<'_> {
173    pub(super) fn into_external(self) -> ExecuteStatus<LiveSpec> {
174        let output: ChildExecutionOutputDescription<LiveSpec> = self.output.into();
175
176        // Compute the error summary and output error slice using
177        // UnitErrorDescription.
178        let desc = UnitErrorDescription::new(UnitKind::Test, &output);
179        let error_summary = desc.all_error_list().map(|errors| ErrorSummary {
180            short_message: errors.short_message(),
181            description: DisplayErrorChain::new(errors).to_string(),
182        });
183        let output_error_slice = desc.output_slice().map(|slice| OutputErrorSlice {
184            slice: slice.to_string(),
185            start: slice.combined_subslice().map(|s| s.start).unwrap_or(0),
186        });
187
188        ExecuteStatus {
189            retry_data: self.test.retry_data(),
190            output,
191            result: self.result.into(),
192            start_time: self.stopwatch_end.start_time.fixed_offset(),
193            time_taken: self.stopwatch_end.active,
194            is_slow: self.slow_after.is_some(),
195            delay_before_start: self.test.delay_before_start(),
196            error_summary,
197            output_error_slice,
198        }
199    }
200}
201
202pub(super) struct InternalSetupScriptExecuteStatus<'a> {
203    pub(super) script: SetupScriptPacket<'a>,
204    pub(super) slow_after: Option<Duration>,
205    pub(super) output: ChildExecutionOutput,
206    pub(super) result: ExecutionResult,
207    pub(super) stopwatch_end: StopwatchSnapshot,
208    pub(super) env_map: Option<SetupScriptEnvMap>,
209}
210
211impl InternalSetupScriptExecuteStatus<'_> {
212    pub(super) fn into_external(self) -> SetupScriptExecuteStatus<LiveSpec> {
213        let output: ChildExecutionOutputDescription<LiveSpec> = self.output.into();
214
215        // Compute the error summary using UnitErrorDescription.
216        // Setup scripts don't have output_error_slice since that's only for
217        // tests (setup scripts can fail in all kinds of ways, while tests fail
218        // in more predictable ones).
219        let desc = UnitErrorDescription::new(UnitKind::Script, &output);
220        let error_summary = desc.all_error_list().map(|errors| ErrorSummary {
221            short_message: errors.short_message(),
222            description: DisplayErrorChain::new(errors).to_string(),
223        });
224
225        SetupScriptExecuteStatus {
226            output,
227            result: self.result.into(),
228            start_time: self.stopwatch_end.start_time.fixed_offset(),
229            time_taken: self.stopwatch_end.active,
230            is_slow: self.slow_after.is_some(),
231            env_map: self.env_map,
232            error_summary,
233        }
234    }
235}
236
237/// Events sent from the dispatcher to individual unit execution tasks.
238#[derive(Clone, Debug)]
239pub(super) enum RunUnitRequest<'a> {
240    Signal(SignalRequest),
241    /// Non-signal cancellation requests (e.g. test failures) which should cause
242    /// tests to exit in some states.
243    OtherCancel,
244    Query(RunUnitQuery<'a>),
245}
246
247impl<'a> RunUnitRequest<'a> {
248    pub(super) fn drain(self, status: UnitExecuteStatus<'a, '_>) {
249        match self {
250            #[cfg(unix)]
251            Self::Signal(SignalRequest::Stop(sender)) => {
252                // The receiver being dead isn't really important.
253                let _ = sender.send(());
254            }
255            #[cfg(unix)]
256            Self::Signal(SignalRequest::Continue) => {}
257            Self::Signal(SignalRequest::Shutdown(_)) => {}
258            Self::OtherCancel => {}
259            Self::Query(RunUnitQuery::GetInfo(tx)) => {
260                // The receiver being dead isn't really important.
261                _ = tx.send(status.info_response());
262            }
263        }
264    }
265}
266
267#[derive(Clone, Debug)]
268pub(super) enum SignalRequest {
269    // The mpsc sender is used by each test to indicate that the stop signal has been sent.
270    #[cfg(unix)]
271    Stop(UnboundedSender<()>),
272    #[cfg(unix)]
273    Continue,
274    Shutdown(ShutdownRequest),
275}
276
277#[derive(Copy, Clone, Debug, Eq, PartialEq)]
278pub(super) enum ShutdownRequest {
279    Once(ShutdownEvent),
280    Twice,
281}
282
283#[derive(Clone, Debug)]
284pub(super) enum RunUnitQuery<'a> {
285    GetInfo(UnboundedSender<InfoResponse<'a>>),
286}
287
288#[derive(Clone, Copy, Debug, Eq, PartialEq)]
289pub(super) enum InternalTerminateReason {
290    Timeout,
291    Signal(ShutdownRequest),
292}
293
294pub(super) enum RunnerTaskState {
295    Finished { child_join_errors: Vec<JoinError> },
296    Cancelled,
297}
298
299impl RunnerTaskState {
300    /// Mark a runner task as finished and having not run any children.
301    pub(super) fn finished_no_children() -> Self {
302        Self::Finished {
303            child_join_errors: Vec::new(),
304        }
305    }
306}
307
308#[derive(Clone, Copy, Debug)]
309#[must_use]
310pub(super) enum HandleSignalResult {
311    /// A job control signal was delivered.
312    #[cfg(unix)]
313    JobControl,
314
315    /// The child was terminated.
316    #[cfg_attr(not(windows), expect(dead_code))]
317    Terminated(TerminateChildResult),
318}
319
320#[derive(Clone, Copy, Debug)]
321#[must_use]
322pub(super) enum TerminateChildResult {
323    /// The child process exited without being forcibly killed.
324    Exited,
325
326    /// The child process was forcibly killed.
327    Killed,
328}