nextest_runner/record/
reader.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Reading logic for recorded test runs.
5//!
6//! The [`RecordReader`] reads a recorded test run from disk, providing access
7//! to metadata and events stored during the run.
8
9use super::{
10    format::{
11        CARGO_METADATA_JSON_PATH, OutputDict, RECORD_OPTS_JSON_PATH, RUN_LOG_FILE_NAME,
12        STDERR_DICT_PATH, STDOUT_DICT_PATH, STORE_ZIP_FILE_NAME, TEST_LIST_JSON_PATH,
13    },
14    summary::{RecordOpts, TestEventSummary, ZipStoreOutput},
15};
16use crate::{
17    errors::RecordReadError,
18    record::format::{RERUN_INFO_JSON_PATH, RerunInfo},
19    user_config::elements::MAX_MAX_OUTPUT_SIZE,
20};
21use camino::{Utf8Path, Utf8PathBuf};
22use debug_ignore::DebugIgnore;
23use nextest_metadata::TestListSummary;
24use std::{
25    fs::File,
26    io::{BufRead, BufReader, Read},
27};
28use zip::{ZipArchive, result::ZipError};
29
30/// Reader for a recorded test run.
31///
32/// Provides access to the metadata and events stored during a test run.
33/// The archive is opened lazily when methods are called.
34#[derive(Debug)]
35pub struct RecordReader {
36    run_dir: Utf8PathBuf,
37    archive: Option<ZipArchive<File>>,
38    /// Cached stdout dictionary loaded from the archive.
39    stdout_dict: Option<Vec<u8>>,
40    /// Cached stderr dictionary loaded from the archive.
41    stderr_dict: Option<Vec<u8>>,
42}
43
44impl RecordReader {
45    /// Opens a recorded run from its directory.
46    ///
47    /// The directory should contain `store.zip` and `run.log.zst`.
48    pub fn open(run_dir: &Utf8Path) -> Result<Self, RecordReadError> {
49        if !run_dir.exists() {
50            return Err(RecordReadError::RunNotFound {
51                path: run_dir.to_owned(),
52            });
53        }
54
55        Ok(Self {
56            run_dir: run_dir.to_owned(),
57            archive: None,
58            stdout_dict: None,
59            stderr_dict: None,
60        })
61    }
62
63    /// Returns the path to the run directory.
64    pub fn run_dir(&self) -> &Utf8Path {
65        &self.run_dir
66    }
67
68    /// Opens the zip archive if not already open.
69    fn ensure_archive(&mut self) -> Result<&mut ZipArchive<File>, RecordReadError> {
70        if self.archive.is_none() {
71            let store_path = self.run_dir.join(STORE_ZIP_FILE_NAME);
72            let file = File::open(&store_path).map_err(|error| RecordReadError::OpenArchive {
73                path: store_path,
74                error,
75            })?;
76            let archive =
77                ZipArchive::new(file).map_err(|error| RecordReadError::ReadArchiveFile {
78                    file_name: STORE_ZIP_FILE_NAME.to_string(),
79                    error,
80                })?;
81            self.archive = Some(archive);
82        }
83        Ok(self.archive.as_mut().expect("archive was just set"))
84    }
85
86    /// Reads a file from the archive as bytes, with size limit.
87    ///
88    /// The size limit prevents malicious archives from causing OOM by
89    /// specifying a huge decompressed size. The limit is checked against the
90    /// claimed size in the ZIP header, and `take()` is used during decompression
91    /// to guard against spoofed headers.
92    ///
93    /// Since nextest controls archive creation, any mismatch between the header
94    /// size and actual size indicates corruption or tampering.
95    fn read_archive_file(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
96        let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
97        let archive = self.ensure_archive()?;
98        let file =
99            archive
100                .by_name(file_name)
101                .map_err(|error| RecordReadError::ReadArchiveFile {
102                    file_name: file_name.to_string(),
103                    error,
104                })?;
105
106        let claimed_size = file.size();
107        if claimed_size > limit {
108            return Err(RecordReadError::FileTooLarge {
109                file_name: file_name.to_string(),
110                size: claimed_size,
111                limit,
112            });
113        }
114
115        let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
116        let mut contents = Vec::with_capacity(capacity);
117
118        file.take(limit)
119            .read_to_end(&mut contents)
120            .map_err(|error| RecordReadError::Decompress {
121                file_name: file_name.to_string(),
122                error,
123            })?;
124
125        let actual_size = contents.len() as u64;
126        if actual_size != claimed_size {
127            return Err(RecordReadError::SizeMismatch {
128                file_name: file_name.to_string(),
129                claimed_size,
130                actual_size,
131            });
132        }
133
134        Ok(contents)
135    }
136
137    /// Returns the cargo metadata JSON from the archive.
138    pub fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
139        let bytes = self.read_archive_file(CARGO_METADATA_JSON_PATH)?;
140        String::from_utf8(bytes).map_err(|e| RecordReadError::Decompress {
141            file_name: CARGO_METADATA_JSON_PATH.to_string(),
142            error: std::io::Error::new(std::io::ErrorKind::InvalidData, e),
143        })
144    }
145
146    /// Returns the test list from the archive.
147    pub fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
148        let bytes = self.read_archive_file(TEST_LIST_JSON_PATH)?;
149        serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
150            file_name: TEST_LIST_JSON_PATH.to_string(),
151            error,
152        })
153    }
154
155    /// Returns the record options from the archive.
156    pub fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
157        let bytes = self.read_archive_file(RECORD_OPTS_JSON_PATH)?;
158        serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
159            file_name: RECORD_OPTS_JSON_PATH.to_string(),
160            error,
161        })
162    }
163
164    /// Returns the rerun info from the archive, if this is a rerun.
165    ///
166    /// Returns `Ok(None)` if this run is not a rerun (the file doesn't exist).
167    /// Returns `Err` if the file exists but cannot be read or parsed.
168    pub fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
169        match self.read_archive_file(RERUN_INFO_JSON_PATH) {
170            Ok(bytes) => {
171                let info = serde_json::from_slice(&bytes).map_err(|error| {
172                    RecordReadError::DeserializeMetadata {
173                        file_name: RERUN_INFO_JSON_PATH.to_string(),
174                        error,
175                    }
176                })?;
177                Ok(Some(info))
178            }
179            Err(RecordReadError::ReadArchiveFile {
180                error: ZipError::FileNotFound,
181                ..
182            }) => {
183                // File doesn't exist; this is not a rerun.
184                Ok(None)
185            }
186            Err(e) => Err(e),
187        }
188    }
189
190    /// Loads the dictionaries from the archive.
191    ///
192    /// This must be called before reading output files. The dictionaries are
193    /// used for decompressing test output.
194    ///
195    /// Note: The store format version is checked before opening the archive,
196    /// using the `store_format_version` field in runs.json.zst. This method
197    /// assumes the version has already been validated.
198    pub fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
199        self.stdout_dict = Some(self.read_archive_file(STDOUT_DICT_PATH)?);
200        self.stderr_dict = Some(self.read_archive_file(STDERR_DICT_PATH)?);
201        Ok(())
202    }
203
204    /// Returns an iterator over events in the run log.
205    ///
206    /// Events are read one at a time from the zstd-compressed JSON Lines file.
207    pub fn events(&self) -> Result<RecordEventIter, RecordReadError> {
208        let log_path = self.run_dir.join(RUN_LOG_FILE_NAME);
209        let file = File::open(&log_path).map_err(|error| RecordReadError::OpenRunLog {
210            path: log_path.clone(),
211            error,
212        })?;
213        let decoder =
214            zstd::stream::Decoder::new(file).map_err(|error| RecordReadError::OpenRunLog {
215                path: log_path,
216                error,
217            })?;
218        Ok(RecordEventIter {
219            reader: DebugIgnore(BufReader::new(decoder)),
220            line_buf: String::new(),
221            line_number: 0,
222        })
223    }
224
225    /// Reads output for a specific file from the archive.
226    ///
227    /// The `file_name` should be the value from `ZipStoreOutput::file_name`,
228    /// e.g., "test-abc123-1-stdout".
229    ///
230    /// The [`OutputFileName`](crate::record::OutputFileName) type ensures that
231    /// file names are validated during deserialization, preventing path traversal.
232    ///
233    /// # Panics
234    ///
235    /// Panics if [`load_dictionaries`](Self::load_dictionaries) has not been called first.
236    pub fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
237        let path = format!("out/{file_name}");
238        let compressed = self.read_archive_file(&path)?;
239        let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
240
241        // Output files are stored pre-compressed with zstd dictionaries.
242        // Unknown file types indicate a format revision that should have been
243        // rejected during version validation.
244        let dict_bytes = self.get_dict_for_output(file_name).ok_or_else(|| {
245            RecordReadError::UnknownOutputType {
246                file_name: file_name.to_owned(),
247            }
248        })?;
249
250        decompress_with_dict(&compressed, dict_bytes, limit).map_err(|error| {
251            RecordReadError::Decompress {
252                file_name: path,
253                error,
254            }
255        })
256    }
257
258    /// Returns the dictionary bytes for the given output file name, if known.
259    ///
260    /// Returns `None` for unknown file types, which indicates a format revision
261    /// that should have been rejected during version validation.
262    ///
263    /// # Panics
264    ///
265    /// Panics if [`load_dictionaries`](Self::load_dictionaries) has not been called first.
266    fn get_dict_for_output(&self, file_name: &str) -> Option<&[u8]> {
267        match OutputDict::for_output_file_name(file_name) {
268            OutputDict::Stdout => Some(
269                self.stdout_dict
270                    .as_ref()
271                    .expect("load_dictionaries must be called first"),
272            ),
273            OutputDict::Stderr => Some(
274                self.stderr_dict
275                    .as_ref()
276                    .expect("load_dictionaries must be called first"),
277            ),
278            OutputDict::None => None,
279        }
280    }
281}
282
283/// Decompresses data using a pre-trained zstd dictionary, with a size limit.
284///
285/// The limit prevents compression bombs where a small compressed payload
286/// expands to an extremely large decompressed output.
287fn decompress_with_dict(
288    compressed: &[u8],
289    dict_bytes: &[u8],
290    limit: u64,
291) -> std::io::Result<Vec<u8>> {
292    let dict = zstd::dict::DecoderDictionary::copy(dict_bytes);
293    let decoder = zstd::stream::Decoder::with_prepared_dictionary(compressed, &dict)?;
294    let mut decompressed = Vec::new();
295    decoder.take(limit).read_to_end(&mut decompressed)?;
296    Ok(decompressed)
297}
298
299/// Zstd decoder reading from a file.
300type LogDecoder = zstd::stream::Decoder<'static, BufReader<File>>;
301
302/// Iterator over recorded events.
303///
304/// Reads events one at a time from the zstd-compressed JSON Lines run log.
305#[derive(Debug)]
306pub struct RecordEventIter {
307    reader: DebugIgnore<BufReader<LogDecoder>>,
308    line_buf: String,
309    line_number: usize,
310}
311
312impl Iterator for RecordEventIter {
313    type Item = Result<TestEventSummary<ZipStoreOutput>, RecordReadError>;
314
315    fn next(&mut self) -> Option<Self::Item> {
316        loop {
317            self.line_buf.clear();
318            self.line_number += 1;
319
320            match self.reader.read_line(&mut self.line_buf) {
321                Ok(0) => return None,
322                Ok(_) => {
323                    let trimmed = self.line_buf.trim();
324                    if trimmed.is_empty() {
325                        continue;
326                    }
327                    return Some(serde_json::from_str(trimmed).map_err(|error| {
328                        RecordReadError::ParseEvent {
329                            line_number: self.line_number,
330                            error,
331                        }
332                    }));
333                }
334                Err(error) => {
335                    return Some(Err(RecordReadError::ReadRunLog {
336                        line_number: self.line_number,
337                        error,
338                    }));
339                }
340            }
341        }
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348
349    #[test]
350    fn test_record_reader_nonexistent_dir() {
351        let result = RecordReader::open(Utf8Path::new("/nonexistent/path"));
352        assert!(matches!(result, Err(RecordReadError::RunNotFound { .. })));
353    }
354}