1use super::{
10 format::{
11 CARGO_METADATA_JSON_PATH, OutputDict, RECORD_OPTS_JSON_PATH, RUN_LOG_FILE_NAME,
12 STDERR_DICT_PATH, STDOUT_DICT_PATH, STORE_ZIP_FILE_NAME, TEST_LIST_JSON_PATH,
13 },
14 summary::{RecordOpts, TestEventSummary, ZipStoreOutput},
15};
16use crate::{
17 errors::RecordReadError,
18 record::format::{RERUN_INFO_JSON_PATH, RerunInfo},
19 user_config::elements::MAX_MAX_OUTPUT_SIZE,
20};
21use camino::{Utf8Path, Utf8PathBuf};
22use debug_ignore::DebugIgnore;
23use nextest_metadata::TestListSummary;
24use std::{
25 fs::File,
26 io::{BufRead, BufReader, Read},
27};
28use zip::{ZipArchive, result::ZipError};
29
30#[derive(Debug)]
35pub struct RecordReader {
36 run_dir: Utf8PathBuf,
37 archive: Option<ZipArchive<File>>,
38 stdout_dict: Option<Vec<u8>>,
40 stderr_dict: Option<Vec<u8>>,
42}
43
44impl RecordReader {
45 pub fn open(run_dir: &Utf8Path) -> Result<Self, RecordReadError> {
49 if !run_dir.exists() {
50 return Err(RecordReadError::RunNotFound {
51 path: run_dir.to_owned(),
52 });
53 }
54
55 Ok(Self {
56 run_dir: run_dir.to_owned(),
57 archive: None,
58 stdout_dict: None,
59 stderr_dict: None,
60 })
61 }
62
63 pub fn run_dir(&self) -> &Utf8Path {
65 &self.run_dir
66 }
67
68 fn ensure_archive(&mut self) -> Result<&mut ZipArchive<File>, RecordReadError> {
70 if self.archive.is_none() {
71 let store_path = self.run_dir.join(STORE_ZIP_FILE_NAME);
72 let file = File::open(&store_path).map_err(|error| RecordReadError::OpenArchive {
73 path: store_path,
74 error,
75 })?;
76 let archive =
77 ZipArchive::new(file).map_err(|error| RecordReadError::ReadArchiveFile {
78 file_name: STORE_ZIP_FILE_NAME.to_string(),
79 error,
80 })?;
81 self.archive = Some(archive);
82 }
83 Ok(self.archive.as_mut().expect("archive was just set"))
84 }
85
86 fn read_archive_file(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
96 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
97 let archive = self.ensure_archive()?;
98 let file =
99 archive
100 .by_name(file_name)
101 .map_err(|error| RecordReadError::ReadArchiveFile {
102 file_name: file_name.to_string(),
103 error,
104 })?;
105
106 let claimed_size = file.size();
107 if claimed_size > limit {
108 return Err(RecordReadError::FileTooLarge {
109 file_name: file_name.to_string(),
110 size: claimed_size,
111 limit,
112 });
113 }
114
115 let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
116 let mut contents = Vec::with_capacity(capacity);
117
118 file.take(limit)
119 .read_to_end(&mut contents)
120 .map_err(|error| RecordReadError::Decompress {
121 file_name: file_name.to_string(),
122 error,
123 })?;
124
125 let actual_size = contents.len() as u64;
126 if actual_size != claimed_size {
127 return Err(RecordReadError::SizeMismatch {
128 file_name: file_name.to_string(),
129 claimed_size,
130 actual_size,
131 });
132 }
133
134 Ok(contents)
135 }
136
137 pub fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
139 let bytes = self.read_archive_file(CARGO_METADATA_JSON_PATH)?;
140 String::from_utf8(bytes).map_err(|e| RecordReadError::Decompress {
141 file_name: CARGO_METADATA_JSON_PATH.to_string(),
142 error: std::io::Error::new(std::io::ErrorKind::InvalidData, e),
143 })
144 }
145
146 pub fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
148 let bytes = self.read_archive_file(TEST_LIST_JSON_PATH)?;
149 serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
150 file_name: TEST_LIST_JSON_PATH.to_string(),
151 error,
152 })
153 }
154
155 pub fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
157 let bytes = self.read_archive_file(RECORD_OPTS_JSON_PATH)?;
158 serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
159 file_name: RECORD_OPTS_JSON_PATH.to_string(),
160 error,
161 })
162 }
163
164 pub fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
169 match self.read_archive_file(RERUN_INFO_JSON_PATH) {
170 Ok(bytes) => {
171 let info = serde_json::from_slice(&bytes).map_err(|error| {
172 RecordReadError::DeserializeMetadata {
173 file_name: RERUN_INFO_JSON_PATH.to_string(),
174 error,
175 }
176 })?;
177 Ok(Some(info))
178 }
179 Err(RecordReadError::ReadArchiveFile {
180 error: ZipError::FileNotFound,
181 ..
182 }) => {
183 Ok(None)
185 }
186 Err(e) => Err(e),
187 }
188 }
189
190 pub fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
199 self.stdout_dict = Some(self.read_archive_file(STDOUT_DICT_PATH)?);
200 self.stderr_dict = Some(self.read_archive_file(STDERR_DICT_PATH)?);
201 Ok(())
202 }
203
204 pub fn events(&self) -> Result<RecordEventIter, RecordReadError> {
208 let log_path = self.run_dir.join(RUN_LOG_FILE_NAME);
209 let file = File::open(&log_path).map_err(|error| RecordReadError::OpenRunLog {
210 path: log_path.clone(),
211 error,
212 })?;
213 let decoder =
214 zstd::stream::Decoder::new(file).map_err(|error| RecordReadError::OpenRunLog {
215 path: log_path,
216 error,
217 })?;
218 Ok(RecordEventIter {
219 reader: DebugIgnore(BufReader::new(decoder)),
220 line_buf: String::new(),
221 line_number: 0,
222 })
223 }
224
225 pub fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
237 let path = format!("out/{file_name}");
238 let compressed = self.read_archive_file(&path)?;
239 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
240
241 let dict_bytes = self.get_dict_for_output(file_name).ok_or_else(|| {
245 RecordReadError::UnknownOutputType {
246 file_name: file_name.to_owned(),
247 }
248 })?;
249
250 decompress_with_dict(&compressed, dict_bytes, limit).map_err(|error| {
251 RecordReadError::Decompress {
252 file_name: path,
253 error,
254 }
255 })
256 }
257
258 fn get_dict_for_output(&self, file_name: &str) -> Option<&[u8]> {
267 match OutputDict::for_output_file_name(file_name) {
268 OutputDict::Stdout => Some(
269 self.stdout_dict
270 .as_ref()
271 .expect("load_dictionaries must be called first"),
272 ),
273 OutputDict::Stderr => Some(
274 self.stderr_dict
275 .as_ref()
276 .expect("load_dictionaries must be called first"),
277 ),
278 OutputDict::None => None,
279 }
280 }
281}
282
283fn decompress_with_dict(
288 compressed: &[u8],
289 dict_bytes: &[u8],
290 limit: u64,
291) -> std::io::Result<Vec<u8>> {
292 let dict = zstd::dict::DecoderDictionary::copy(dict_bytes);
293 let decoder = zstd::stream::Decoder::with_prepared_dictionary(compressed, &dict)?;
294 let mut decompressed = Vec::new();
295 decoder.take(limit).read_to_end(&mut decompressed)?;
296 Ok(decompressed)
297}
298
299type LogDecoder = zstd::stream::Decoder<'static, BufReader<File>>;
301
302#[derive(Debug)]
306pub struct RecordEventIter {
307 reader: DebugIgnore<BufReader<LogDecoder>>,
308 line_buf: String,
309 line_number: usize,
310}
311
312impl Iterator for RecordEventIter {
313 type Item = Result<TestEventSummary<ZipStoreOutput>, RecordReadError>;
314
315 fn next(&mut self) -> Option<Self::Item> {
316 loop {
317 self.line_buf.clear();
318 self.line_number += 1;
319
320 match self.reader.read_line(&mut self.line_buf) {
321 Ok(0) => return None,
322 Ok(_) => {
323 let trimmed = self.line_buf.trim();
324 if trimmed.is_empty() {
325 continue;
326 }
327 return Some(serde_json::from_str(trimmed).map_err(|error| {
328 RecordReadError::ParseEvent {
329 line_number: self.line_number,
330 error,
331 }
332 }));
333 }
334 Err(error) => {
335 return Some(Err(RecordReadError::ReadRunLog {
336 line_number: self.line_number,
337 error,
338 }));
339 }
340 }
341 }
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348
349 #[test]
350 fn test_record_reader_nonexistent_dir() {
351 let result = RecordReader::open(Utf8Path::new("/nonexistent/path"));
352 assert!(matches!(result, Err(RecordReadError::RunNotFound { .. })));
353 }
354}