1use super::{
22 format::{
23 CARGO_METADATA_JSON_PATH, OutputDict, PORTABLE_MANIFEST_FILE_NAME,
24 PORTABLE_RECORDING_FORMAT_VERSION, PortableManifest, RECORD_OPTS_JSON_PATH,
25 RERUN_INFO_JSON_PATH, RUN_LOG_FILE_NAME, RerunInfo, STDERR_DICT_PATH, STDOUT_DICT_PATH,
26 STORE_FORMAT_VERSION, STORE_ZIP_FILE_NAME, TEST_LIST_JSON_PATH, has_zip_extension,
27 stored_file_options,
28 },
29 reader::{StoreReader, decompress_with_dict},
30 store::{RecordedRunInfo, RunFilesExist, StoreRunsDir},
31 summary::{RecordOpts, TestEventSummary},
32};
33use crate::{
34 errors::{PortableRecordingError, PortableRecordingReadError, RecordReadError},
35 output_spec::RecordingSpec,
36 user_config::elements::MAX_MAX_OUTPUT_SIZE,
37};
38use atomicwrites::{AtomicFile, OverwriteBehavior};
39use bytesize::ByteSize;
40use camino::{Utf8Path, Utf8PathBuf};
41use countio::Counter;
42use debug_ignore::DebugIgnore;
43use eazip::{Archive, ArchiveWriter, CompressionMethod};
44use itertools::Either;
45use nextest_metadata::TestListSummary;
46use std::{
47 borrow::Cow,
48 fs::File,
49 io::{self, BufRead, BufReader, Cursor, Read, Seek, SeekFrom, Write},
50};
51
52#[derive(Debug)]
54pub struct PortableRecordingResult {
55 pub path: Utf8PathBuf,
57 pub size: u64,
59}
60
61#[derive(Debug)]
63pub struct ExtractOuterFileResult {
64 pub bytes_written: u64,
66 pub exceeded_limit: Option<u64>,
71}
72
73#[derive(Debug)]
75pub struct PortableRecordingWriter<'a> {
76 run_info: &'a RecordedRunInfo,
77 run_dir: Utf8PathBuf,
78}
79
80impl<'a> PortableRecordingWriter<'a> {
81 pub fn new(
85 run_info: &'a RecordedRunInfo,
86 runs_dir: StoreRunsDir<'_>,
87 ) -> Result<Self, PortableRecordingError> {
88 let run_dir = runs_dir.run_dir(run_info.run_id);
89
90 if !run_dir.exists() {
91 return Err(PortableRecordingError::RunDirNotFound { path: run_dir });
92 }
93
94 let store_zip_path = run_dir.join(STORE_ZIP_FILE_NAME);
95 if !store_zip_path.exists() {
96 return Err(PortableRecordingError::RequiredFileMissing {
97 run_dir,
98 file_name: STORE_ZIP_FILE_NAME,
99 });
100 }
101
102 let run_log_path = run_dir.join(RUN_LOG_FILE_NAME);
103 if !run_log_path.exists() {
104 return Err(PortableRecordingError::RequiredFileMissing {
105 run_dir,
106 file_name: RUN_LOG_FILE_NAME,
107 });
108 }
109
110 Ok(Self { run_info, run_dir })
111 }
112
113 pub fn default_filename(&self) -> String {
117 format!("nextest-run-{}.zip", self.run_info.run_id)
118 }
119
120 pub fn write_to_dir(
125 &self,
126 output_dir: &Utf8Path,
127 ) -> Result<PortableRecordingResult, PortableRecordingError> {
128 let output_path = output_dir.join(self.default_filename());
129 self.write_to_path(&output_path)
130 }
131
132 pub fn write_to_path(
136 &self,
137 output_path: &Utf8Path,
138 ) -> Result<PortableRecordingResult, PortableRecordingError> {
139 let atomic_file = AtomicFile::new(output_path, OverwriteBehavior::AllowOverwrite);
140
141 let final_size = atomic_file
142 .write(|temp_file| {
143 let counter = Counter::new(temp_file);
144 let mut zip_writer = ArchiveWriter::new(counter);
145
146 self.write_manifest(&mut zip_writer)?;
147 self.copy_file(&mut zip_writer, RUN_LOG_FILE_NAME)?;
148 self.copy_file(&mut zip_writer, STORE_ZIP_FILE_NAME)?;
149
150 let counter = zip_writer
151 .finish()
152 .map_err(PortableRecordingError::ZipFinalize)?;
153
154 let counter_bytes = counter.writer_bytes() as u64;
159 let file = counter.into_inner();
160 let size = file.metadata().map(|m| m.len()).unwrap_or(counter_bytes);
161
162 Ok(size)
163 })
164 .map_err(|err| match err {
165 atomicwrites::Error::Internal(source) => PortableRecordingError::AtomicWrite {
166 path: output_path.to_owned(),
167 source,
168 },
169 atomicwrites::Error::User(e) => e,
170 })?;
171
172 Ok(PortableRecordingResult {
173 path: output_path.to_owned(),
174 size: final_size,
175 })
176 }
177
178 fn write_manifest<W: Write>(
180 &self,
181 zip_writer: &mut ArchiveWriter<W>,
182 ) -> Result<(), PortableRecordingError> {
183 let manifest = PortableManifest::new(self.run_info);
184 let manifest_json = serde_json::to_vec_pretty(&manifest)
185 .map_err(PortableRecordingError::SerializeManifest)?;
186
187 let options = stored_file_options();
188
189 zip_writer
190 .add_file(PORTABLE_MANIFEST_FILE_NAME, &manifest_json[..], &options)
191 .map_err(|source| PortableRecordingError::ZipWrite {
192 file_name: PORTABLE_MANIFEST_FILE_NAME,
193 source,
194 })?;
195
196 Ok(())
197 }
198
199 fn copy_file<W: Write>(
204 &self,
205 zip_writer: &mut ArchiveWriter<W>,
206 file_name: &'static str,
207 ) -> Result<(), PortableRecordingError> {
208 let source_path = self.run_dir.join(file_name);
209 let mut file = File::open(&source_path)
210 .map_err(|source| PortableRecordingError::ReadFile { file_name, source })?;
211
212 let options = stored_file_options();
213
214 let mut streamer = zip_writer
215 .stream_file(file_name, &options)
216 .map_err(|source| PortableRecordingError::ZipStartFile { file_name, source })?;
217
218 io::copy(&mut file, &mut streamer)
219 .map_err(|source| PortableRecordingError::ZipWrite { file_name, source })?;
220
221 streamer
222 .finish()
223 .map_err(|source| PortableRecordingError::ZipWrite { file_name, source })?;
224
225 Ok(())
226 }
227}
228
229const SPOOL_SIZE_LIMIT: ByteSize = ByteSize(4 * 1024 * 1024 * 1024);
239
240#[cfg(windows)]
247enum WindowsFileKind {
248 Disk,
250 Pipe,
252 Other(u32),
255}
256
257#[cfg(windows)]
259fn classify_windows_handle(file: &File) -> WindowsFileKind {
260 use std::os::windows::io::AsRawHandle;
261 use windows_sys::Win32::Storage::FileSystem::{FILE_TYPE_DISK, FILE_TYPE_PIPE, GetFileType};
262
263 let file_type = unsafe { GetFileType(file.as_raw_handle()) };
265 match file_type {
266 FILE_TYPE_DISK => WindowsFileKind::Disk,
267 FILE_TYPE_PIPE => WindowsFileKind::Pipe,
268 other => WindowsFileKind::Other(other),
269 }
270}
271
272#[cfg(unix)]
275fn is_not_seekable_error(e: &io::Error) -> bool {
276 e.raw_os_error() == Some(libc::ESPIPE)
278}
279
280fn ensure_seekable(file: File, path: &Utf8Path) -> Result<File, PortableRecordingReadError> {
289 ensure_seekable_impl(file, path, SPOOL_SIZE_LIMIT)
290}
291
292fn ensure_seekable_impl(
296 file: File,
297 path: &Utf8Path,
298 spool_limit: ByteSize,
299) -> Result<File, PortableRecordingReadError> {
300 #[cfg(unix)]
303 {
304 let mut file = file;
305 match file.stream_position() {
306 Ok(_) => Ok(file),
307 Err(e) if is_not_seekable_error(&e) => spool_to_temp(file, path, spool_limit),
308 Err(e) => {
309 Err(PortableRecordingReadError::SeekProbe {
312 path: path.to_owned(),
313 error: e,
314 })
315 }
316 }
317 }
318
319 #[cfg(windows)]
323 match classify_windows_handle(&file) {
324 WindowsFileKind::Disk => Ok(file),
325 WindowsFileKind::Pipe => spool_to_temp(file, path, spool_limit),
326 WindowsFileKind::Other(file_type) => Err(PortableRecordingReadError::SeekProbe {
327 path: path.to_owned(),
328 error: io::Error::other(format!(
329 "unexpected file handle type {file_type:#x} (expected disk or pipe)"
330 )),
331 }),
332 }
333}
334
335fn spool_to_temp(
339 file: File,
340 path: &Utf8Path,
341 spool_limit: ByteSize,
342) -> Result<File, PortableRecordingReadError> {
343 let mut temp =
344 camino_tempfile::tempfile().map_err(|error| PortableRecordingReadError::SpoolTempFile {
345 path: path.to_owned(),
346 error,
347 })?;
348
349 let bytes_copied = io::copy(
353 &mut (&file).take(spool_limit.0.saturating_add(1)),
354 &mut temp,
355 )
356 .map_err(|error| PortableRecordingReadError::SpoolTempFile {
357 path: path.to_owned(),
358 error,
359 })?;
360
361 if bytes_copied > spool_limit.0 {
362 return Err(PortableRecordingReadError::SpoolTooLarge {
363 path: path.to_owned(),
364 limit: spool_limit,
365 });
366 }
367
368 temp.seek(SeekFrom::Start(0))
370 .map_err(|error| PortableRecordingReadError::SpoolTempFile {
371 path: path.to_owned(),
372 error,
373 })?;
374
375 Ok(temp)
376}
377
378type ArchiveReadStorage = Either<File, Cursor<Vec<u8>>>;
383
384pub struct PortableRecording {
386 archive_path: Utf8PathBuf,
387 manifest: PortableManifest,
388 outer_archive: Archive<BufReader<ArchiveReadStorage>>,
389}
390
391impl std::fmt::Debug for PortableRecording {
392 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393 f.debug_struct("PortableRecording")
394 .field("archive_path", &self.archive_path)
395 .field("manifest", &self.manifest)
396 .finish_non_exhaustive()
397 }
398}
399
400impl RunFilesExist for PortableRecording {
401 fn store_zip_exists(&self) -> bool {
402 self.outer_archive.index_of(STORE_ZIP_FILE_NAME).is_some()
403 }
404
405 fn run_log_exists(&self) -> bool {
406 self.outer_archive.index_of(RUN_LOG_FILE_NAME).is_some()
407 }
408}
409
410impl PortableRecording {
411 pub fn open(path: &Utf8Path) -> Result<Self, PortableRecordingReadError> {
421 let file = File::open(path).map_err(|error| PortableRecordingReadError::OpenArchive {
422 path: path.to_owned(),
423 error,
424 })?;
425
426 let file = ensure_seekable(file, path)?;
429
430 let mut outer_archive =
431 Archive::new(BufReader::new(Either::Left(file))).map_err(|error| {
432 PortableRecordingReadError::ReadArchive {
433 path: path.to_owned(),
434 error,
435 }
436 })?;
437
438 if outer_archive
440 .index_of(PORTABLE_MANIFEST_FILE_NAME)
441 .is_some()
442 {
443 return Self::open_validated(path, outer_archive);
444 }
445
446 let mut file_count = 0;
450 let mut zip_count = 0;
451 let mut zip_file: Option<String> = None;
452 for metadata in outer_archive.entries() {
453 let name = metadata.name();
454 if name.ends_with('/') || name.ends_with('\\') {
455 continue;
457 }
458 file_count += 1;
459 if has_zip_extension(Utf8Path::new(name)) {
460 zip_count += 1;
461 if zip_count == 1 {
462 zip_file = Some(name.to_owned());
463 }
464 }
465 }
466
467 if let Some(inner_name) = zip_file.filter(|_| file_count == 1 && zip_count == 1) {
468 let inner_bytes = read_outer_file(&mut outer_archive, inner_name.into(), path)?;
473 let inner_archive = Archive::new(BufReader::new(Either::Right(Cursor::new(
474 inner_bytes,
475 ))))
476 .map_err(|error| PortableRecordingReadError::ReadArchive {
477 path: path.to_owned(),
478 error,
479 })?;
480 Self::open_validated(path, inner_archive)
481 } else {
482 Err(PortableRecordingReadError::NotAWrapperArchive {
483 path: path.to_owned(),
484 file_count,
485 zip_count,
486 })
487 }
488 }
489
490 fn open_validated(
492 path: &Utf8Path,
493 mut outer_archive: Archive<BufReader<ArchiveReadStorage>>,
494 ) -> Result<Self, PortableRecordingReadError> {
495 let manifest_bytes =
497 read_outer_file(&mut outer_archive, PORTABLE_MANIFEST_FILE_NAME.into(), path)?;
498 let manifest: PortableManifest =
499 serde_json::from_slice(&manifest_bytes).map_err(|error| {
500 PortableRecordingReadError::ParseManifest {
501 path: path.to_owned(),
502 error,
503 }
504 })?;
505
506 if let Err(incompatibility) = manifest
508 .format_version
509 .check_readable_by(PORTABLE_RECORDING_FORMAT_VERSION)
510 {
511 return Err(PortableRecordingReadError::UnsupportedFormatVersion {
512 path: path.to_owned(),
513 found: manifest.format_version,
514 supported: PORTABLE_RECORDING_FORMAT_VERSION,
515 incompatibility,
516 });
517 }
518
519 let store_version = manifest.store_format_version();
521 if let Err(incompatibility) = store_version.check_readable_by(STORE_FORMAT_VERSION) {
522 return Err(PortableRecordingReadError::UnsupportedStoreFormatVersion {
523 path: path.to_owned(),
524 found: store_version,
525 supported: STORE_FORMAT_VERSION,
526 incompatibility,
527 });
528 }
529
530 Ok(Self {
531 archive_path: path.to_owned(),
532 manifest,
533 outer_archive,
534 })
535 }
536
537 pub fn archive_path(&self) -> &Utf8Path {
539 &self.archive_path
540 }
541
542 pub fn run_info(&self) -> RecordedRunInfo {
544 self.manifest.run_info()
545 }
546
547 pub fn read_run_log(&mut self) -> Result<PortableRecordingRunLog, PortableRecordingReadError> {
553 let run_log_bytes = read_outer_file(
554 &mut self.outer_archive,
555 RUN_LOG_FILE_NAME.into(),
556 &self.archive_path,
557 )?;
558 Ok(PortableRecordingRunLog {
559 archive_path: self.archive_path.clone(),
560 run_log_bytes,
561 })
562 }
563
564 pub fn extract_outer_file_to_path(
573 &mut self,
574 file_name: &'static str,
575 output_path: &Utf8Path,
576 check_limit: bool,
577 ) -> Result<ExtractOuterFileResult, PortableRecordingReadError> {
578 extract_outer_file_to_path(
579 &mut self.outer_archive,
580 file_name,
581 &self.archive_path,
582 output_path,
583 check_limit,
584 )
585 }
586
587 pub fn open_store(&mut self) -> Result<PortableStoreReader<'_>, PortableRecordingReadError> {
594 let file = self
595 .outer_archive
596 .get_by_name(STORE_ZIP_FILE_NAME)
597 .ok_or_else(|| PortableRecordingReadError::MissingFile {
598 path: self.archive_path.clone(),
599 file_name: Cow::Borrowed(STORE_ZIP_FILE_NAME),
600 })?;
601
602 let compression = file.metadata().compression_method;
603 if compression != CompressionMethod::STORE {
604 return Err(PortableRecordingReadError::CompressedInnerArchive {
605 archive_path: self.archive_path.clone(),
606 compression,
607 });
608 }
609
610 let raw = file
612 .read_stored()
613 .map_err(|error| PortableRecordingReadError::ReadArchive {
614 path: self.archive_path.clone(),
615 error,
616 })?;
617
618 let store_archive =
619 Archive::new(raw).map_err(|error| PortableRecordingReadError::ReadArchive {
620 path: self.archive_path.clone(),
621 error,
622 })?;
623
624 Ok(PortableStoreReader {
625 archive_path: &self.archive_path,
626 store_archive,
627 stdout_dict: None,
628 stderr_dict: None,
629 })
630 }
631}
632
633fn read_outer_file(
635 archive: &mut Archive<BufReader<ArchiveReadStorage>>,
636 file_name: Cow<'static, str>,
637 archive_path: &Utf8Path,
638) -> Result<Vec<u8>, PortableRecordingReadError> {
639 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
640 let mut file =
641 archive
642 .get_by_name(&file_name)
643 .ok_or_else(|| PortableRecordingReadError::MissingFile {
644 path: archive_path.to_owned(),
645 file_name: file_name.clone(),
646 })?;
647
648 let claimed_size = file.metadata().uncompressed_size;
649 if claimed_size > limit {
650 return Err(PortableRecordingReadError::FileTooLarge {
651 path: archive_path.to_owned(),
652 file_name,
653 size: claimed_size,
654 limit,
655 });
656 }
657
658 let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
659 let mut contents = Vec::with_capacity(capacity);
660
661 file.read()
662 .and_then(|reader| reader.take(limit).read_to_end(&mut contents))
663 .map_err(|error| PortableRecordingReadError::ReadArchive {
664 path: archive_path.to_owned(),
665 error,
666 })?;
667
668 Ok(contents)
669}
670
671fn extract_outer_file_to_path(
673 archive: &mut Archive<BufReader<ArchiveReadStorage>>,
674 file_name: &'static str,
675 archive_path: &Utf8Path,
676 output_path: &Utf8Path,
677 check_limit: bool,
678) -> Result<ExtractOuterFileResult, PortableRecordingReadError> {
679 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
680 let mut file =
681 archive
682 .get_by_name(file_name)
683 .ok_or_else(|| PortableRecordingReadError::MissingFile {
684 path: archive_path.to_owned(),
685 file_name: Cow::Borrowed(file_name),
686 })?;
687
688 let claimed_size = file.metadata().uncompressed_size;
689 let exceeded_limit = if check_limit && claimed_size > limit {
690 Some(claimed_size)
691 } else {
692 None
693 };
694
695 let mut output_file =
696 File::create(output_path).map_err(|error| PortableRecordingReadError::ExtractFile {
697 archive_path: archive_path.to_owned(),
698 file_name,
699 output_path: output_path.to_owned(),
700 error,
701 })?;
702
703 let mut reader = file
704 .read()
705 .map_err(|error| PortableRecordingReadError::ReadArchive {
706 path: archive_path.to_owned(),
707 error,
708 })?;
709
710 let bytes_written = io::copy(&mut reader, &mut output_file).map_err(|error| {
711 PortableRecordingReadError::ExtractFile {
712 archive_path: archive_path.to_owned(),
713 file_name,
714 output_path: output_path.to_owned(),
715 error,
716 }
717 })?;
718
719 Ok(ExtractOuterFileResult {
720 bytes_written,
721 exceeded_limit,
722 })
723}
724
725#[derive(Debug)]
730pub struct PortableRecordingRunLog {
731 archive_path: Utf8PathBuf,
732 run_log_bytes: Vec<u8>,
733}
734
735impl PortableRecordingRunLog {
736 pub fn events(&self) -> Result<PortableRecordingEventIter<'_>, RecordReadError> {
738 let decoder =
741 zstd::stream::Decoder::with_buffer(&self.run_log_bytes[..]).map_err(|error| {
742 RecordReadError::OpenRunLog {
743 path: self.archive_path.join(RUN_LOG_FILE_NAME),
744 error,
745 }
746 })?;
747 Ok(PortableRecordingEventIter {
748 reader: DebugIgnore(BufReader::new(decoder)),
750 line_buf: String::new(),
751 line_number: 0,
752 })
753 }
754}
755
756#[derive(Debug)]
758pub struct PortableRecordingEventIter<'a> {
759 reader: DebugIgnore<BufReader<zstd::stream::Decoder<'static, &'a [u8]>>>,
760 line_buf: String,
761 line_number: usize,
762}
763
764impl Iterator for PortableRecordingEventIter<'_> {
765 type Item = Result<TestEventSummary<RecordingSpec>, RecordReadError>;
766
767 fn next(&mut self) -> Option<Self::Item> {
768 loop {
769 self.line_buf.clear();
770 self.line_number += 1;
771
772 match self.reader.read_line(&mut self.line_buf) {
773 Ok(0) => return None,
774 Ok(_) => {
775 let trimmed = self.line_buf.trim();
776 if trimmed.is_empty() {
777 continue;
778 }
779 return Some(serde_json::from_str(trimmed).map_err(|error| {
780 RecordReadError::ParseEvent {
781 line_number: self.line_number,
782 error,
783 }
784 }));
785 }
786 Err(error) => {
787 return Some(Err(RecordReadError::ReadRunLog {
788 line_number: self.line_number,
789 error,
790 }));
791 }
792 }
793 }
794 }
795}
796
797pub struct PortableStoreReader<'a> {
802 archive_path: &'a Utf8Path,
803 store_archive: Archive<io::Take<&'a mut BufReader<ArchiveReadStorage>>>,
804 stdout_dict: Option<Vec<u8>>,
806 stderr_dict: Option<Vec<u8>>,
808}
809
810impl std::fmt::Debug for PortableStoreReader<'_> {
811 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
812 f.debug_struct("PortableStoreReader")
813 .field("archive_path", &self.archive_path)
814 .field("stdout_dict", &self.stdout_dict.as_ref().map(|d| d.len()))
815 .field("stderr_dict", &self.stderr_dict.as_ref().map(|d| d.len()))
816 .finish_non_exhaustive()
817 }
818}
819
820impl PortableStoreReader<'_> {
821 fn read_store_file(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
823 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
824 let mut file = self.store_archive.get_by_name(file_name).ok_or_else(|| {
825 RecordReadError::FileNotFound {
826 file_name: file_name.to_string(),
827 }
828 })?;
829
830 let claimed_size = file.metadata().uncompressed_size;
831 if claimed_size > limit {
832 return Err(RecordReadError::FileTooLarge {
833 file_name: file_name.to_string(),
834 size: claimed_size,
835 limit,
836 });
837 }
838
839 let capacity = usize::try_from(claimed_size).unwrap_or(usize::MAX);
840 let mut contents = Vec::with_capacity(capacity);
841
842 file.read()
843 .and_then(|reader| reader.take(limit).read_to_end(&mut contents))
844 .map_err(|error| RecordReadError::Decompress {
845 file_name: file_name.to_string(),
846 error,
847 })?;
848
849 let actual_size = contents.len() as u64;
850 if actual_size != claimed_size {
851 return Err(RecordReadError::SizeMismatch {
852 file_name: file_name.to_string(),
853 claimed_size,
854 actual_size,
855 });
856 }
857
858 Ok(contents)
859 }
860
861 fn get_dict_for_output(&self, file_name: &str) -> Option<&[u8]> {
863 match OutputDict::for_output_file_name(file_name) {
864 OutputDict::Stdout => Some(
865 self.stdout_dict
866 .as_ref()
867 .expect("load_dictionaries must be called first"),
868 ),
869 OutputDict::Stderr => Some(
870 self.stderr_dict
871 .as_ref()
872 .expect("load_dictionaries must be called first"),
873 ),
874 OutputDict::None => None,
875 }
876 }
877}
878
879impl StoreReader for PortableStoreReader<'_> {
880 fn read_cargo_metadata(&mut self) -> Result<String, RecordReadError> {
881 let bytes = self.read_store_file(CARGO_METADATA_JSON_PATH)?;
882 String::from_utf8(bytes).map_err(|e| RecordReadError::Decompress {
883 file_name: CARGO_METADATA_JSON_PATH.to_string(),
884 error: std::io::Error::new(std::io::ErrorKind::InvalidData, e),
885 })
886 }
887
888 fn read_test_list(&mut self) -> Result<TestListSummary, RecordReadError> {
889 let bytes = self.read_store_file(TEST_LIST_JSON_PATH)?;
890 serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
891 file_name: TEST_LIST_JSON_PATH.to_string(),
892 error,
893 })
894 }
895
896 fn read_record_opts(&mut self) -> Result<RecordOpts, RecordReadError> {
897 let bytes = self.read_store_file(RECORD_OPTS_JSON_PATH)?;
898 serde_json::from_slice(&bytes).map_err(|error| RecordReadError::DeserializeMetadata {
899 file_name: RECORD_OPTS_JSON_PATH.to_string(),
900 error,
901 })
902 }
903
904 fn read_rerun_info(&mut self) -> Result<Option<RerunInfo>, RecordReadError> {
905 match self.read_store_file(RERUN_INFO_JSON_PATH) {
906 Ok(bytes) => {
907 let info = serde_json::from_slice(&bytes).map_err(|error| {
908 RecordReadError::DeserializeMetadata {
909 file_name: RERUN_INFO_JSON_PATH.to_string(),
910 error,
911 }
912 })?;
913 Ok(Some(info))
914 }
915 Err(RecordReadError::FileNotFound { .. }) => {
916 Ok(None)
918 }
919 Err(e) => Err(e),
920 }
921 }
922
923 fn load_dictionaries(&mut self) -> Result<(), RecordReadError> {
924 self.stdout_dict = Some(self.read_store_file(STDOUT_DICT_PATH)?);
925 self.stderr_dict = Some(self.read_store_file(STDERR_DICT_PATH)?);
926 Ok(())
927 }
928
929 fn read_output(&mut self, file_name: &str) -> Result<Vec<u8>, RecordReadError> {
930 let path = format!("out/{file_name}");
931 let compressed = self.read_store_file(&path)?;
932 let limit = MAX_MAX_OUTPUT_SIZE.as_u64();
933
934 let dict_bytes = self.get_dict_for_output(file_name).ok_or_else(|| {
935 RecordReadError::UnknownOutputType {
936 file_name: file_name.to_owned(),
937 }
938 })?;
939
940 decompress_with_dict(&compressed, dict_bytes, limit).map_err(|error| {
941 RecordReadError::Decompress {
942 file_name: path,
943 error,
944 }
945 })
946 }
947
948 fn extract_file_to_path(
949 &mut self,
950 store_path: &str,
951 output_path: &Utf8Path,
952 ) -> Result<u64, RecordReadError> {
953 let mut file = self.store_archive.get_by_name(store_path).ok_or_else(|| {
954 RecordReadError::FileNotFound {
955 file_name: store_path.to_owned(),
956 }
957 })?;
958
959 let mut output_file =
960 File::create(output_path).map_err(|error| RecordReadError::ExtractFile {
961 store_path: store_path.to_owned(),
962 output_path: output_path.to_owned(),
963 error,
964 })?;
965
966 let mut reader = file
967 .read()
968 .map_err(|error| RecordReadError::ReadArchiveFile {
969 file_name: store_path.to_owned(),
970 error,
971 })?;
972
973 io::copy(&mut reader, &mut output_file).map_err(|error| RecordReadError::ExtractFile {
974 store_path: store_path.to_owned(),
975 output_path: output_path.to_owned(),
976 error,
977 })
978 }
979}
980
981#[cfg(test)]
982mod tests {
983 use super::*;
984 use crate::record::{
985 format::{PORTABLE_RECORDING_FORMAT_VERSION, STORE_FORMAT_VERSION},
986 store::{CompletedRunStats, RecordedRunStatus, RecordedSizes},
987 };
988 use camino_tempfile::{NamedUtf8TempFile, Utf8TempDir};
989 use chrono::Local;
990 use eazip::write::FileOptions;
991 use quick_junit::ReportUuid;
992 use semver::Version;
993 use std::{collections::BTreeMap, io::Read};
994
995 fn create_test_run_dir(run_id: ReportUuid) -> (Utf8TempDir, Utf8PathBuf) {
996 let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
997 let runs_dir = temp_dir.path().to_owned();
998 let run_dir = runs_dir.join(run_id.to_string());
999 std::fs::create_dir_all(&run_dir).expect("create run dir");
1000
1001 let store_path = run_dir.join(STORE_ZIP_FILE_NAME);
1002 let store_file = File::create(&store_path).expect("create store.zip");
1003 let mut zip_writer = ArchiveWriter::new(store_file);
1004 let options = FileOptions::default();
1005 zip_writer
1006 .add_file("test.txt", &b"test content"[..], &options)
1007 .expect("add file");
1008 zip_writer.finish().expect("finish zip");
1009
1010 let log_path = run_dir.join(RUN_LOG_FILE_NAME);
1011 let log_file = File::create(&log_path).expect("create run.log.zst");
1012 let mut encoder = zstd::stream::Encoder::new(log_file, 3).expect("create encoder");
1013 encoder.write_all(b"test log content").expect("write log");
1014 encoder.finish().expect("finish encoder");
1015
1016 (temp_dir, runs_dir)
1017 }
1018
1019 fn create_test_run_info(run_id: ReportUuid) -> RecordedRunInfo {
1020 let now = Local::now().fixed_offset();
1021 RecordedRunInfo {
1022 run_id,
1023 store_format_version: STORE_FORMAT_VERSION,
1024 nextest_version: Version::new(0, 9, 111),
1025 started_at: now,
1026 last_written_at: now,
1027 duration_secs: Some(12.345),
1028 cli_args: vec!["cargo".to_owned(), "nextest".to_owned(), "run".to_owned()],
1029 build_scope_args: vec!["--workspace".to_owned()],
1030 env_vars: BTreeMap::from([("CARGO_TERM_COLOR".to_owned(), "always".to_owned())]),
1031 parent_run_id: None,
1032 sizes: RecordedSizes::default(),
1033 status: RecordedRunStatus::Completed(CompletedRunStats {
1034 initial_run_count: 10,
1035 passed: 9,
1036 failed: 1,
1037 exit_code: 100,
1038 }),
1039 }
1040 }
1041
1042 #[test]
1043 fn test_default_filename() {
1044 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1045 let (_temp_dir, runs_dir) = create_test_run_dir(run_id);
1046 let run_info = create_test_run_info(run_id);
1047
1048 let writer = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir))
1049 .expect("create writer");
1050
1051 assert_eq!(
1052 writer.default_filename(),
1053 "nextest-run-550e8400-e29b-41d4-a716-446655440000.zip"
1054 );
1055 }
1056
1057 #[test]
1058 fn test_write_portable_recording() {
1059 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1060 let (_temp_dir, runs_dir) = create_test_run_dir(run_id);
1061 let run_info = create_test_run_info(run_id);
1062
1063 let writer = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir))
1064 .expect("create writer");
1065
1066 let output_dir = camino_tempfile::tempdir().expect("create output dir");
1067
1068 let result = writer
1069 .write_to_dir(output_dir.path())
1070 .expect("write archive");
1071
1072 assert!(result.path.exists());
1073 assert!(result.size > 0);
1074
1075 let actual_size = std::fs::metadata(&result.path)
1077 .expect("get file metadata")
1078 .len();
1079 assert_eq!(
1080 result.size, actual_size,
1081 "reported size should match actual file size"
1082 );
1083
1084 assert_eq!(
1085 result.path.file_name(),
1086 Some("nextest-run-550e8400-e29b-41d4-a716-446655440000.zip")
1087 );
1088
1089 let archive_file = File::open(&result.path).expect("open archive");
1090 let mut archive = Archive::new(BufReader::new(archive_file)).expect("read archive");
1091
1092 assert_eq!(archive.entries().len(), 3);
1093
1094 {
1095 let mut manifest_file = archive
1096 .get_by_name(PORTABLE_MANIFEST_FILE_NAME)
1097 .expect("manifest");
1098 let mut manifest_content = String::new();
1099 manifest_file
1100 .read()
1101 .expect("get reader")
1102 .read_to_string(&mut manifest_content)
1103 .expect("read manifest");
1104 let manifest: PortableManifest =
1105 serde_json::from_str(&manifest_content).expect("parse manifest");
1106 assert_eq!(manifest.format_version, PORTABLE_RECORDING_FORMAT_VERSION);
1107 assert_eq!(manifest.run.run_id, run_id);
1108 }
1109
1110 {
1111 let store_file = archive.get_by_name(STORE_ZIP_FILE_NAME).expect("store.zip");
1112 assert!(store_file.metadata().uncompressed_size > 0);
1113 }
1114
1115 {
1116 let log_file = archive.get_by_name(RUN_LOG_FILE_NAME).expect("run.log.zst");
1117 assert!(log_file.metadata().uncompressed_size > 0);
1118 }
1119 }
1120
1121 #[test]
1122 fn test_missing_run_dir() {
1123 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1124 let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
1125 let runs_dir = temp_dir.path().to_owned();
1126 let run_info = create_test_run_info(run_id);
1127
1128 let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
1129
1130 assert!(matches!(
1131 result,
1132 Err(PortableRecordingError::RunDirNotFound { .. })
1133 ));
1134 }
1135
1136 #[test]
1137 fn test_missing_store_zip() {
1138 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1139 let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
1140 let runs_dir = temp_dir.path().to_owned();
1141 let run_dir = runs_dir.join(run_id.to_string());
1142 std::fs::create_dir_all(&run_dir).expect("create run dir");
1143
1144 let log_path = run_dir.join(RUN_LOG_FILE_NAME);
1145 let log_file = File::create(&log_path).expect("create run.log.zst");
1146 let mut encoder = zstd::stream::Encoder::new(log_file, 3).expect("create encoder");
1147 encoder.write_all(b"test").expect("write");
1148 encoder.finish().expect("finish");
1149
1150 let run_info = create_test_run_info(run_id);
1151 let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
1152
1153 assert!(
1154 matches!(
1155 &result,
1156 Err(PortableRecordingError::RequiredFileMissing { file_name, .. })
1157 if *file_name == STORE_ZIP_FILE_NAME
1158 ),
1159 "expected RequiredFileMissing for store.zip, got {result:?}"
1160 );
1161 }
1162
1163 #[test]
1164 fn test_missing_run_log() {
1165 let run_id = ReportUuid::from_u128(0x550e8400_e29b_41d4_a716_446655440000);
1166 let temp_dir = camino_tempfile::tempdir().expect("create temp dir");
1167 let runs_dir = temp_dir.path().to_owned();
1168 let run_dir = runs_dir.join(run_id.to_string());
1169 std::fs::create_dir_all(&run_dir).expect("create run dir");
1170
1171 let store_path = run_dir.join(STORE_ZIP_FILE_NAME);
1172 let store_file = File::create(&store_path).expect("create store.zip");
1173 let mut zip_writer = ArchiveWriter::new(store_file);
1174 let options = FileOptions::default();
1175 zip_writer
1176 .add_file("test.txt", &b"test"[..], &options)
1177 .expect("add file");
1178 zip_writer.finish().expect("finish");
1179
1180 let run_info = create_test_run_info(run_id);
1181 let result = PortableRecordingWriter::new(&run_info, StoreRunsDir::new(&runs_dir));
1182
1183 assert!(
1184 matches!(
1185 &result,
1186 Err(PortableRecordingError::RequiredFileMissing { file_name, .. })
1187 if *file_name == RUN_LOG_FILE_NAME
1188 ),
1189 "expected RequiredFileMissing for run.log.zst, got {result:?}"
1190 );
1191 }
1192
1193 #[test]
1194 fn test_ensure_seekable_regular_file() {
1195 let temp = NamedUtf8TempFile::new().expect("created temp file");
1197 let path = temp.path().to_owned();
1198
1199 std::fs::write(&path, b"hello world").expect("wrote to temp file");
1200 let file = File::open(&path).expect("opened temp file");
1201
1202 #[cfg(unix)]
1204 let original_fd = {
1205 use std::os::unix::io::AsRawFd;
1206 file.as_raw_fd()
1207 };
1208
1209 let result = ensure_seekable(file, &path).expect("ensure_seekable succeeded");
1210
1211 #[cfg(unix)]
1213 {
1214 use std::os::unix::io::AsRawFd;
1215 assert_eq!(
1216 result.as_raw_fd(),
1217 original_fd,
1218 "seekable file should be returned as-is"
1219 );
1220 }
1221
1222 let mut contents = String::new();
1224 let mut reader = io::BufReader::new(result);
1225 reader
1226 .read_to_string(&mut contents)
1227 .expect("read file contents");
1228 assert_eq!(contents, "hello world");
1229 }
1230
1231 #[cfg(unix)]
1234 fn pipe_reader_to_file(reader: std::io::PipeReader) -> File {
1235 use std::os::fd::OwnedFd;
1236 File::from(OwnedFd::from(reader))
1237 }
1238
1239 #[cfg(windows)]
1242 fn pipe_reader_to_file(reader: std::io::PipeReader) -> File {
1243 use std::os::windows::io::OwnedHandle;
1244 File::from(OwnedHandle::from(reader))
1245 }
1246
1247 #[test]
1252 fn test_ensure_seekable_pipe() {
1253 let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1254 let test_data = b"zip-like test content for pipe spooling";
1255
1256 pipe_writer.write_all(test_data).expect("wrote to pipe");
1258 drop(pipe_writer);
1259
1260 let pipe_file = pipe_reader_to_file(pipe_reader);
1261
1262 let path = Utf8Path::new("/dev/fd/99");
1263 let result = ensure_seekable(pipe_file, path).expect("ensure_seekable succeeded");
1264
1265 let mut contents = Vec::new();
1267 let mut reader = io::BufReader::new(result);
1268 reader
1269 .read_to_end(&mut contents)
1270 .expect("read spooled contents");
1271 assert_eq!(contents, test_data);
1272 }
1273
1274 #[test]
1280 fn test_ensure_seekable_empty_pipe() {
1281 let (pipe_reader, pipe_writer) = std::io::pipe().expect("created pipe");
1282 drop(pipe_writer);
1284
1285 let pipe_file = pipe_reader_to_file(pipe_reader);
1286 let path = Utf8Path::new("/dev/fd/42");
1287 let mut result = ensure_seekable(pipe_file, path).expect("empty pipe should succeed");
1288
1289 let mut contents = Vec::new();
1290 result.read_to_end(&mut contents).expect("read contents");
1291 assert!(contents.is_empty());
1292 }
1293
1294 #[test]
1299 fn test_ensure_seekable_spool_too_large() {
1300 let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1301
1302 pipe_writer
1304 .write_all(b"01234567890123456789")
1305 .expect("wrote to pipe");
1306 drop(pipe_writer);
1307
1308 let pipe_file = pipe_reader_to_file(pipe_reader);
1309
1310 let path = Utf8Path::new("/dev/fd/42");
1311 let result = ensure_seekable_impl(pipe_file, path, ByteSize(10));
1312 assert!(
1313 matches!(
1314 &result,
1315 Err(PortableRecordingReadError::SpoolTooLarge {
1316 limit: ByteSize(10),
1317 ..
1318 })
1319 ),
1320 "expected SpoolTooLarge, got {result:?}"
1321 );
1322 }
1323
1324 #[test]
1329 fn test_ensure_seekable_spool_one_over_limit() {
1330 let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1331
1332 pipe_writer
1334 .write_all(b"01234567890")
1335 .expect("wrote to pipe");
1336 drop(pipe_writer);
1337
1338 let pipe_file = pipe_reader_to_file(pipe_reader);
1339
1340 let path = Utf8Path::new("/dev/fd/42");
1341 let result = ensure_seekable_impl(pipe_file, path, ByteSize(10));
1342 assert!(
1343 matches!(
1344 &result,
1345 Err(PortableRecordingReadError::SpoolTooLarge {
1346 limit: ByteSize(10),
1347 ..
1348 })
1349 ),
1350 "expected SpoolTooLarge at limit+1 bytes, got {result:?}"
1351 );
1352 }
1353
1354 #[test]
1356 fn test_ensure_seekable_spool_exact_limit() {
1357 let (pipe_reader, mut pipe_writer) = std::io::pipe().expect("created pipe");
1358
1359 pipe_writer.write_all(b"0123456789").expect("wrote to pipe");
1361 drop(pipe_writer);
1362
1363 let pipe_file = pipe_reader_to_file(pipe_reader);
1364
1365 let path = Utf8Path::new("/dev/fd/42");
1366 let mut result = ensure_seekable_impl(pipe_file, path, ByteSize(10))
1367 .expect("exact limit should succeed");
1368
1369 let mut contents = Vec::new();
1371 result.read_to_end(&mut contents).expect("read contents");
1372 assert_eq!(contents, b"0123456789");
1373 }
1374}