nextest_runner/record/
store.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Run store management for nextest recordings.
5//!
6//! The run store is a directory that contains all recorded test runs. It provides:
7//!
8//! - A lock file for exclusive access during modifications.
9//! - A zstd-compressed JSON file (`runs.json.zst`) listing all recorded runs.
10//! - Individual directories for each run containing the archive and log.
11
12use super::{
13    display::{DisplayRecordedRunInfo, DisplayRecordedRunInfoDetailed, RunListAlignment, Styles},
14    format::{
15        RUN_LOG_FILE_NAME, RecordedRunList, RunsJsonWritePermission, STORE_FORMAT_VERSION,
16        STORE_ZIP_FILE_NAME, StoreFormatVersion, StoreVersionIncompatibility,
17    },
18    recorder::{RunRecorder, StoreSizes},
19    retention::{
20        PruneKind, PrunePlan, PruneResult, RecordRetentionPolicy, delete_orphaned_dirs, delete_runs,
21    },
22    run_id_index::{PrefixResolutionError, RunIdIndex, RunIdSelector, ShortestRunIdPrefix},
23};
24use crate::{
25    errors::{RunIdResolutionError, RunStoreError},
26    helpers::{ThemeCharacters, decimal_char_width},
27    redact::Redactor,
28};
29use camino::{Utf8Path, Utf8PathBuf};
30use chrono::{DateTime, FixedOffset, Local, TimeDelta, Utc};
31use debug_ignore::DebugIgnore;
32use quick_junit::ReportUuid;
33use semver::Version;
34use std::{
35    collections::{BTreeMap, HashMap, HashSet},
36    fmt,
37    fs::{File, TryLockError},
38    io,
39    num::NonZero,
40    thread,
41    time::{Duration, Instant},
42};
43
44static RUNS_LOCK_FILE_NAME: &str = "runs.lock";
45static RUNS_JSON_FILE_NAME: &str = "runs.json.zst";
46
47/// A reference to the runs directory in a run store.
48///
49/// This provides methods to compute paths within the runs directory.
50#[derive(Clone, Copy, Debug)]
51pub struct StoreRunsDir<'a>(&'a Utf8Path);
52
53impl<'a> StoreRunsDir<'a> {
54    /// Creates a new `StoreRunsDir` from a path.
55    pub fn new(path: &'a Utf8Path) -> Self {
56        Self(path)
57    }
58
59    /// Returns the path to a specific run's directory.
60    pub fn run_dir(self, run_id: ReportUuid) -> Utf8PathBuf {
61        self.0.join(run_id.to_string())
62    }
63
64    /// Returns a [`RunFilesExist`] implementation for checking file existence
65    /// for a specific run in this store.
66    pub fn run_files(self, run_id: ReportUuid) -> StoreRunFiles {
67        StoreRunFiles {
68            run_dir: self.run_dir(run_id),
69        }
70    }
71
72    /// Returns the underlying path to the runs directory.
73    pub fn as_path(self) -> &'a Utf8Path {
74        self.0
75    }
76}
77
78/// Trait for checking whether required run files exist.
79///
80/// This abstracts over different storage backends.
81pub trait RunFilesExist {
82    /// Returns true if `store.zip` exists.
83    fn store_zip_exists(&self) -> bool;
84    /// Returns true if `run.log.zst` exists.
85    fn run_log_exists(&self) -> bool;
86}
87
88/// Checks file existence for a run stored on disk.
89///
90/// Created via [`StoreRunsDir::run_files`].
91pub struct StoreRunFiles {
92    run_dir: Utf8PathBuf,
93}
94
95impl RunFilesExist for StoreRunFiles {
96    fn store_zip_exists(&self) -> bool {
97        self.run_dir.join(STORE_ZIP_FILE_NAME).exists()
98    }
99
100    fn run_log_exists(&self) -> bool {
101        self.run_dir.join(RUN_LOG_FILE_NAME).exists()
102    }
103}
104
105/// Manages the storage of recorded test runs.
106///
107/// The run store is a directory containing a list of recorded runs and their data.
108/// Use [`RunStore::lock_exclusive`] to acquire exclusive access before creating
109/// new runs.
110#[derive(Debug)]
111pub struct RunStore {
112    runs_dir: Utf8PathBuf,
113}
114
115impl RunStore {
116    /// Creates a new `RunStore` at the given directory.
117    ///
118    /// Creates the directory if it doesn't exist.
119    pub fn new(store_dir: &Utf8Path) -> Result<Self, RunStoreError> {
120        let runs_dir = store_dir.join("runs");
121        std::fs::create_dir_all(&runs_dir).map_err(|error| RunStoreError::RunDirCreate {
122            run_dir: runs_dir.clone(),
123            error,
124        })?;
125
126        Ok(Self { runs_dir })
127    }
128
129    /// Returns the runs directory.
130    pub fn runs_dir(&self) -> StoreRunsDir<'_> {
131        StoreRunsDir(&self.runs_dir)
132    }
133
134    /// Acquires a shared lock on the run store for reading.
135    ///
136    /// Multiple readers can hold the shared lock simultaneously, but the shared
137    /// lock is exclusive with the exclusive lock (used for writing).
138    ///
139    /// Uses non-blocking lock attempts with retries to handle both brief
140    /// contention and filesystems where locking may not work (e.g., NFS).
141    pub fn lock_shared(&self) -> Result<SharedLockedRunStore<'_>, RunStoreError> {
142        let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
143        let file = std::fs::OpenOptions::new()
144            .create(true)
145            .truncate(false)
146            .write(true)
147            .open(&lock_file_path)
148            .map_err(|error| RunStoreError::FileLock {
149                path: lock_file_path.clone(),
150                error,
151            })?;
152
153        acquire_lock_with_retry(&file, &lock_file_path, LockKind::Shared)?;
154        let result = read_runs_json(&self.runs_dir)?;
155        let run_id_index = RunIdIndex::new(&result.runs);
156
157        Ok(SharedLockedRunStore {
158            runs_dir: StoreRunsDir(&self.runs_dir),
159            locked_file: DebugIgnore(file),
160            runs: result.runs,
161            write_permission: result.write_permission,
162            run_id_index,
163        })
164    }
165
166    /// Acquires an exclusive lock on the run store.
167    ///
168    /// This lock should only be held for a short duration (just long enough to
169    /// add a run to the list and create its directory).
170    ///
171    /// Uses non-blocking lock attempts with retries to handle both brief
172    /// contention and filesystems where locking may not work (e.g., NFS).
173    pub fn lock_exclusive(&self) -> Result<ExclusiveLockedRunStore<'_>, RunStoreError> {
174        let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
175        let file = std::fs::OpenOptions::new()
176            .create(true)
177            .truncate(false)
178            .write(true)
179            .open(&lock_file_path)
180            .map_err(|error| RunStoreError::FileLock {
181                path: lock_file_path.clone(),
182                error,
183            })?;
184
185        acquire_lock_with_retry(&file, &lock_file_path, LockKind::Exclusive)?;
186        let result = read_runs_json(&self.runs_dir)?;
187
188        Ok(ExclusiveLockedRunStore {
189            runs_dir: StoreRunsDir(&self.runs_dir),
190            locked_file: DebugIgnore(file),
191            runs: result.runs,
192            last_pruned_at: result.last_pruned_at,
193            write_permission: result.write_permission,
194        })
195    }
196}
197
198/// A run store that has been locked for exclusive access.
199///
200/// The lifetime parameter ensures this isn't held for longer than the
201/// corresponding [`RunStore`].
202#[derive(Debug)]
203pub struct ExclusiveLockedRunStore<'store> {
204    runs_dir: StoreRunsDir<'store>,
205    // Held for RAII lock semantics; the lock is released when this struct is dropped.
206    #[expect(dead_code)]
207    locked_file: DebugIgnore<File>,
208    runs: Vec<RecordedRunInfo>,
209    last_pruned_at: Option<DateTime<Utc>>,
210    write_permission: RunsJsonWritePermission,
211}
212
213impl<'store> ExclusiveLockedRunStore<'store> {
214    /// Returns the runs directory.
215    pub fn runs_dir(&self) -> StoreRunsDir<'store> {
216        self.runs_dir
217    }
218
219    /// Returns whether this nextest can write to the runs.json.zst file.
220    ///
221    /// If the file has a newer format version than we support, writing is denied.
222    pub fn write_permission(&self) -> RunsJsonWritePermission {
223        self.write_permission
224    }
225
226    /// Marks a run as completed and persists the change to disk.
227    ///
228    /// Updates sizes, `status`, and `duration_secs` to the given values.
229    /// Returns `true` if the run was found and updated, `false` if no run
230    /// with the given ID exists (in which case nothing is persisted).
231    ///
232    /// Returns an error if writing is denied due to a format version mismatch.
233    ///
234    /// The status should not be `Incomplete` since we're completing the run.
235    pub fn complete_run(
236        &mut self,
237        run_id: ReportUuid,
238        sizes: StoreSizes,
239        status: RecordedRunStatus,
240        duration_secs: Option<f64>,
241    ) -> Result<bool, RunStoreError> {
242        if let RunsJsonWritePermission::Denied {
243            file_version,
244            max_supported_version,
245        } = self.write_permission
246        {
247            return Err(RunStoreError::FormatVersionTooNew {
248                file_version,
249                max_supported_version,
250            });
251        }
252
253        let found = self.mark_run_completed_inner(run_id, sizes, status, duration_secs);
254        if found {
255            write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
256        }
257        Ok(found)
258    }
259
260    /// Updates a run's metadata in memory.
261    fn mark_run_completed_inner(
262        &mut self,
263        run_id: ReportUuid,
264        sizes: StoreSizes,
265        status: RecordedRunStatus,
266        duration_secs: Option<f64>,
267    ) -> bool {
268        for run in &mut self.runs {
269            if run.run_id == run_id {
270                run.sizes = RecordedSizes {
271                    log: ComponentSizes {
272                        compressed: sizes.log.compressed,
273                        uncompressed: sizes.log.uncompressed,
274                        entries: sizes.log.entries,
275                    },
276                    store: ComponentSizes {
277                        compressed: sizes.store.compressed,
278                        uncompressed: sizes.store.uncompressed,
279                        entries: sizes.store.entries,
280                    },
281                };
282                run.status = status;
283                run.duration_secs = duration_secs;
284                run.last_written_at = Local::now().fixed_offset();
285                return true;
286            }
287        }
288        false
289    }
290
291    /// Prunes runs according to the given retention policy.
292    ///
293    /// This method:
294    /// 1. Determines which runs to delete based on the policy
295    /// 2. Deletes those run directories from disk
296    /// 3. Deletes any orphaned directories not tracked in runs.json.zst
297    /// 4. Updates the run list in memory and on disk
298    ///
299    /// The `kind` parameter indicates whether this is explicit pruning (from a
300    /// user command) or implicit pruning (automatic during recording). This
301    /// affects how errors are displayed.
302    ///
303    /// Returns the result of the pruning operation, including any errors that
304    /// occurred while deleting individual runs.
305    ///
306    /// Returns an error if writing is denied due to a format version mismatch.
307    pub fn prune(
308        &mut self,
309        policy: &RecordRetentionPolicy,
310        kind: PruneKind,
311    ) -> Result<PruneResult, RunStoreError> {
312        if let RunsJsonWritePermission::Denied {
313            file_version,
314            max_supported_version,
315        } = self.write_permission
316        {
317            return Err(RunStoreError::FormatVersionTooNew {
318                file_version,
319                max_supported_version,
320            });
321        }
322
323        let now = Utc::now();
324        let to_delete: HashSet<_> = policy
325            .compute_runs_to_delete(&self.runs, now)
326            .into_iter()
327            .collect();
328
329        let runs_dir = self.runs_dir();
330        let mut result = if to_delete.is_empty() {
331            PruneResult::default()
332        } else {
333            delete_runs(runs_dir, &mut self.runs, &to_delete)
334        };
335        result.kind = kind;
336
337        let known_runs: HashSet<_> = self.runs.iter().map(|r| r.run_id).collect();
338        delete_orphaned_dirs(self.runs_dir, &known_runs, &mut result);
339
340        if result.deleted_count > 0 || result.orphans_deleted > 0 {
341            // Update last_pruned_at since we performed pruning.
342            self.last_pruned_at = Some(now);
343            write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
344        }
345
346        Ok(result)
347    }
348
349    /// Prunes runs if needed, based on time since last prune and limit thresholds.
350    ///
351    /// This method implements implicit pruning, which occurs:
352    /// - If more than 1 day has passed since the last prune, OR
353    /// - If any retention limit is exceeded by 1.5x.
354    ///
355    /// Use [`Self::prune`] for explicit pruning that always runs regardless of these conditions.
356    ///
357    /// Returns `Ok(None)` if pruning was skipped, `Ok(Some(result))` if pruning occurred.
358    pub fn prune_if_needed(
359        &mut self,
360        policy: &RecordRetentionPolicy,
361    ) -> Result<Option<PruneResult>, RunStoreError> {
362        const PRUNE_INTERVAL: TimeDelta = match TimeDelta::try_days(1) {
363            Some(d) => d,
364            None => panic!("1 day should always be a valid TimeDelta"),
365        };
366        const LIMIT_EXCEEDED_FACTOR: f64 = 1.5;
367
368        let now = Utc::now();
369
370        // Check if pruning is needed.
371        let time_since_last_prune = self
372            .last_pruned_at
373            .map(|last| now.signed_duration_since(last))
374            .unwrap_or(TimeDelta::MAX);
375
376        let should_prune = time_since_last_prune >= PRUNE_INTERVAL
377            || policy.limits_exceeded_by_factor(&self.runs, LIMIT_EXCEEDED_FACTOR);
378
379        if should_prune {
380            Ok(Some(self.prune(policy, PruneKind::Implicit)?))
381        } else {
382            Ok(None)
383        }
384    }
385
386    /// Creates a run recorder for a new run.
387    ///
388    /// Adds the run to the list and creates its directory. Consumes self,
389    /// dropping the exclusive lock.
390    ///
391    /// `max_output_size` specifies the maximum size of a single output (stdout/stderr)
392    /// before truncation.
393    ///
394    /// Returns the recorder and the shortest unique prefix for the run ID (for
395    /// display purposes), or an error if writing is denied due to a format
396    /// version mismatch.
397    #[expect(clippy::too_many_arguments)]
398    pub(crate) fn create_run_recorder(
399        mut self,
400        run_id: ReportUuid,
401        nextest_version: Version,
402        started_at: DateTime<FixedOffset>,
403        cli_args: Vec<String>,
404        build_scope_args: Vec<String>,
405        env_vars: BTreeMap<String, String>,
406        max_output_size: bytesize::ByteSize,
407        parent_run_id: Option<ReportUuid>,
408    ) -> Result<(RunRecorder, ShortestRunIdPrefix), RunStoreError> {
409        if let RunsJsonWritePermission::Denied {
410            file_version,
411            max_supported_version,
412        } = self.write_permission
413        {
414            return Err(RunStoreError::FormatVersionTooNew {
415                file_version,
416                max_supported_version,
417            });
418        }
419
420        // Add to the list of runs before creating the directory. This ensures
421        // that if creation fails, an empty run directory isn't left behind. (It
422        // does mean that there may be spurious entries in the list of runs,
423        // which will be dealt with during pruning.)
424
425        let now = Local::now().fixed_offset();
426        let run = RecordedRunInfo {
427            run_id,
428            store_format_version: STORE_FORMAT_VERSION,
429            nextest_version,
430            started_at,
431            last_written_at: now,
432            duration_secs: None,
433            cli_args,
434            build_scope_args,
435            env_vars,
436            parent_run_id,
437            sizes: RecordedSizes::default(),
438            status: RecordedRunStatus::Incomplete,
439        };
440        self.runs.push(run);
441
442        // If the parent run ID is set, update its last written at time.
443        if let Some(parent_run_id) = parent_run_id
444            && let Some(parent_run) = self.runs.iter_mut().find(|r| r.run_id == parent_run_id)
445        {
446            parent_run.last_written_at = now;
447        }
448
449        write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
450
451        // Compute the unique prefix now that the run is in the list.
452        let index = RunIdIndex::new(&self.runs);
453        let unique_prefix = index
454            .shortest_unique_prefix(run_id)
455            .expect("run was just added to the list");
456
457        // Create the run directory while still holding the lock. This prevents
458        // a race where another process could prune the newly-added run entry
459        // before the directory exists, leaving an orphaned directory. The lock
460        // is released when `self` is dropped.
461        let run_dir = self.runs_dir().run_dir(run_id);
462
463        let recorder = RunRecorder::new(run_dir, max_output_size)?;
464        Ok((recorder, unique_prefix))
465    }
466}
467
468/// Information about a recorded run.
469#[derive(Clone, Debug)]
470pub struct RecordedRunInfo {
471    /// The unique identifier for this run.
472    pub run_id: ReportUuid,
473    /// The format version of this run's store.zip archive.
474    ///
475    /// This allows checking replayability without opening the archive.
476    pub store_format_version: StoreFormatVersion,
477    /// The version of nextest that created this run.
478    pub nextest_version: Version,
479    /// When the run started.
480    pub started_at: DateTime<FixedOffset>,
481    /// When this run was last written to.
482    ///
483    /// Used for LRU eviction. Updated when the run is created, when the run
484    /// completes, and when a rerun references this run.
485    pub last_written_at: DateTime<FixedOffset>,
486    /// Duration of the run in seconds.
487    ///
488    /// This is `None` for incomplete runs.
489    pub duration_secs: Option<f64>,
490    /// The command-line arguments used to invoke nextest.
491    pub cli_args: Vec<String>,
492    /// Build scope arguments (package and target selection).
493    ///
494    /// These determine which packages and targets are built. In a rerun chain,
495    /// these are inherited from the original run unless explicitly overridden.
496    pub build_scope_args: Vec<String>,
497    /// Environment variables that affect nextest behavior (NEXTEST_* and CARGO_*).
498    pub env_vars: BTreeMap<String, String>,
499    /// If this is a rerun, the ID of the parent run.
500    ///
501    /// This forms a chain for iterative fix-and-rerun workflows.
502    pub parent_run_id: Option<ReportUuid>,
503    /// Sizes broken down by component (log and store).
504    pub sizes: RecordedSizes,
505    /// The status and statistics for this run.
506    pub status: RecordedRunStatus,
507}
508
509/// Sizes broken down by component (log and store).
510#[derive(Clone, Copy, Debug, Default)]
511pub struct RecordedSizes {
512    /// Sizes for the run log (run.log.zst).
513    pub log: ComponentSizes,
514    /// Sizes for the store archive (store.zip).
515    pub store: ComponentSizes,
516}
517
518/// Compressed and uncompressed sizes for a single component.
519#[derive(Clone, Copy, Debug, Default)]
520pub struct ComponentSizes {
521    /// Compressed size in bytes.
522    pub compressed: u64,
523    /// Uncompressed size in bytes.
524    pub uncompressed: u64,
525    /// Number of entries (records for log, files for store).
526    pub entries: u64,
527}
528
529impl RecordedSizes {
530    /// Returns the total compressed size (log + store).
531    pub fn total_compressed(&self) -> u64 {
532        self.log.compressed + self.store.compressed
533    }
534
535    /// Returns the total uncompressed size (log + store).
536    pub fn total_uncompressed(&self) -> u64 {
537        self.log.uncompressed + self.store.uncompressed
538    }
539
540    /// Returns the total number of entries (log records + store files).
541    pub fn total_entries(&self) -> u64 {
542        self.log.entries + self.store.entries
543    }
544}
545
546/// Status and statistics for a recorded run.
547#[derive(Clone, Debug)]
548pub enum RecordedRunStatus {
549    /// The run was interrupted before completion.
550    Incomplete,
551    /// A normal test run completed (all tests finished).
552    Completed(CompletedRunStats),
553    /// A normal test run was cancelled before all tests finished.
554    Cancelled(CompletedRunStats),
555    /// A stress test run completed (all iterations finished).
556    StressCompleted(StressCompletedRunStats),
557    /// A stress test run was cancelled before all iterations finished.
558    StressCancelled(StressCompletedRunStats),
559    /// An unknown status from a newer version of nextest.
560    ///
561    /// This variant is used for forward compatibility when reading runs.json.zst
562    /// files created by newer nextest versions that may have new status types.
563    Unknown,
564}
565
566impl RecordedRunStatus {
567    /// Returns a short status string for display.
568    pub fn short_status_str(&self) -> &'static str {
569        match self {
570            Self::Incomplete => "incomplete",
571            Self::Unknown => "unknown",
572            Self::Completed(_) => "completed",
573            Self::Cancelled(_) => "cancelled",
574            Self::StressCompleted(_) => "stress completed",
575            Self::StressCancelled(_) => "stress cancelled",
576        }
577    }
578
579    /// Returns the exit code for completed runs, or `None` for incomplete/unknown runs.
580    pub fn exit_code(&self) -> Option<i32> {
581        match self {
582            Self::Incomplete | Self::Unknown => None,
583            Self::Completed(stats) | Self::Cancelled(stats) => Some(stats.exit_code),
584            Self::StressCompleted(stats) | Self::StressCancelled(stats) => Some(stats.exit_code),
585        }
586    }
587}
588
589/// Statistics for a normal test run that finished (completed or cancelled).
590#[derive(Clone, Copy, Debug)]
591pub struct CompletedRunStats {
592    /// The number of tests that were expected to run.
593    pub initial_run_count: usize,
594    /// The number of tests that passed.
595    pub passed: usize,
596    /// The number of tests that failed (including exec failures and timeouts).
597    pub failed: usize,
598    /// The exit code from the run.
599    pub exit_code: i32,
600}
601
602/// Statistics for a stress test run that finished (completed or cancelled).
603#[derive(Clone, Copy, Debug)]
604pub struct StressCompletedRunStats {
605    /// The number of stress iterations that were expected to run, if known.
606    ///
607    /// This is `None` when the stress test was run without a fixed iteration count
608    /// (e.g., `--stress-duration`).
609    pub initial_iteration_count: Option<NonZero<u32>>,
610    /// The number of stress iterations that succeeded.
611    pub success_count: u32,
612    /// The number of stress iterations that failed.
613    pub failed_count: u32,
614    /// The exit code from the run.
615    pub exit_code: i32,
616}
617
618// ---
619// Replayability checking
620// ---
621
622/// The result of checking whether a run can be replayed.
623#[derive(Clone, Debug)]
624pub enum ReplayabilityStatus {
625    /// The run is definitely replayable.
626    ///
627    /// No blocking reasons and no uncertain conditions.
628    Replayable,
629    /// The run is definitely not replayable.
630    ///
631    /// Contains at least one blocking reason.
632    NotReplayable(Vec<NonReplayableReason>),
633    /// The run might be replayable but is incomplete.
634    ///
635    /// The archive might be usable, but we'd need to open `store.zip` to
636    /// verify all expected files are present.
637    Incomplete,
638}
639
640/// A definite reason why a run cannot be replayed.
641#[derive(Clone, Debug, PartialEq, Eq)]
642pub enum NonReplayableReason {
643    /// The run's store format version is incompatible with this nextest version.
644    ///
645    /// This nextest version cannot read the archive format.
646    StoreVersionIncompatible {
647        /// The specific incompatibility.
648        incompatibility: StoreVersionIncompatibility,
649    },
650    /// The `store.zip` file is missing from the run directory.
651    MissingStoreZip,
652    /// The `run.log.zst` file is missing from the run directory.
653    MissingRunLog,
654    /// The run status is `Unknown` (from a newer nextest version).
655    ///
656    /// We cannot safely replay since we don't understand the run's state.
657    StatusUnknown,
658}
659
660impl fmt::Display for NonReplayableReason {
661    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
662        match self {
663            Self::StoreVersionIncompatible { incompatibility } => {
664                write!(f, "store format version incompatible: {}", incompatibility)
665            }
666            Self::MissingStoreZip => {
667                write!(f, "store.zip is missing")
668            }
669            Self::MissingRunLog => {
670                write!(f, "run.log.zst is missing")
671            }
672            Self::StatusUnknown => {
673                write!(f, "run status is unknown (from a newer nextest version)")
674            }
675        }
676    }
677}
678
679/// Result of looking up a run by selector.
680#[derive(Clone, Copy, Debug)]
681pub struct ResolveRunIdResult {
682    /// The run ID.
683    pub run_id: ReportUuid,
684}
685
686impl RecordedRunStatus {
687    /// Returns the width (in decimal digits) needed to display the "passed" count.
688    ///
689    /// For non-completed runs (Incomplete, Unknown), returns 0 since they don't
690    /// display a passed count.
691    pub fn passed_count_width(&self) -> usize {
692        match self {
693            Self::Incomplete | Self::Unknown => 0,
694            Self::Completed(stats) | Self::Cancelled(stats) => decimal_char_width(stats.passed),
695            Self::StressCompleted(stats) | Self::StressCancelled(stats) => {
696                decimal_char_width(stats.success_count)
697            }
698        }
699    }
700}
701
702impl RecordedRunInfo {
703    /// Checks whether this run can be replayed.
704    ///
705    /// This performs a comprehensive check of all conditions that might prevent
706    /// replay, including:
707    /// - Store format version compatibility
708    /// - Presence of required files (store.zip, run.log.zst)
709    /// - Run status (unknown, incomplete)
710    ///
711    /// The `files` parameter is used to check for file existence. Use
712    /// [`StoreRunsDir::run_files`] for runs in the store, or pass in a
713    /// [`PortableRecording`](super::PortableRecording) directly.
714    pub fn check_replayability(&self, files: &dyn RunFilesExist) -> ReplayabilityStatus {
715        let mut blocking = Vec::new();
716        let mut is_incomplete = false;
717
718        // Check store format version compatibility.
719        if let Err(incompatibility) = self
720            .store_format_version
721            .check_readable_by(STORE_FORMAT_VERSION)
722        {
723            blocking.push(NonReplayableReason::StoreVersionIncompatible { incompatibility });
724        }
725
726        // Check for required files.
727        if !files.store_zip_exists() {
728            blocking.push(NonReplayableReason::MissingStoreZip);
729        }
730        if !files.run_log_exists() {
731            blocking.push(NonReplayableReason::MissingRunLog);
732        }
733
734        // Check run status.
735        match &self.status {
736            RecordedRunStatus::Unknown => {
737                blocking.push(NonReplayableReason::StatusUnknown);
738            }
739            RecordedRunStatus::Incomplete => {
740                is_incomplete = true;
741            }
742            RecordedRunStatus::Completed(_)
743            | RecordedRunStatus::Cancelled(_)
744            | RecordedRunStatus::StressCompleted(_)
745            | RecordedRunStatus::StressCancelled(_) => {
746                // These statuses are fine for replay.
747            }
748        }
749
750        // Return the appropriate variant based on what we found.
751        if !blocking.is_empty() {
752            ReplayabilityStatus::NotReplayable(blocking)
753        } else if is_incomplete {
754            ReplayabilityStatus::Incomplete
755        } else {
756            ReplayabilityStatus::Replayable
757        }
758    }
759
760    /// Returns a display wrapper for this run.
761    ///
762    /// The `run_id_index` is used for computing shortest unique prefixes,
763    /// which are highlighted differently in the output (similar to jj).
764    ///
765    /// The `alignment` parameter controls column alignment when displaying a
766    /// list of runs. Use [`RunListAlignment::from_runs`] to precompute
767    /// alignment for a set of runs.
768    ///
769    /// The `redactor` parameter, if provided, redacts timestamps, durations,
770    /// and sizes for snapshot testing while preserving column alignment.
771    pub fn display<'a>(
772        &'a self,
773        run_id_index: &'a RunIdIndex,
774        replayability: &'a ReplayabilityStatus,
775        alignment: RunListAlignment,
776        styles: &'a Styles,
777        redactor: &'a Redactor,
778    ) -> DisplayRecordedRunInfo<'a> {
779        DisplayRecordedRunInfo::new(
780            self,
781            run_id_index,
782            replayability,
783            alignment,
784            styles,
785            redactor,
786        )
787    }
788
789    /// Returns a detailed display wrapper for this run.
790    ///
791    /// Unlike [`Self::display`] which shows a compact table row, this provides
792    /// a multi-line detailed view suitable for the `store info` command.
793    ///
794    /// The `replayability` parameter should be computed by the caller using
795    /// [`Self::check_replayability`].
796    ///
797    /// The `now` parameter is the current time, used to compute relative
798    /// durations (e.g. "30s ago").
799    ///
800    /// The `redactor` parameter redacts paths, timestamps, durations, and sizes
801    /// for snapshot testing. Use `Redactor::noop()` if no redaction is needed.
802    pub fn display_detailed<'a>(
803        &'a self,
804        run_id_index: &'a RunIdIndex,
805        replayability: &'a ReplayabilityStatus,
806        now: DateTime<Utc>,
807        styles: &'a Styles,
808        theme_characters: &'a ThemeCharacters,
809        redactor: &'a Redactor,
810    ) -> DisplayRecordedRunInfoDetailed<'a> {
811        DisplayRecordedRunInfoDetailed::new(
812            self,
813            run_id_index,
814            replayability,
815            now,
816            styles,
817            theme_characters,
818            redactor,
819        )
820    }
821}
822
823/// Result of reading runs.json.zst.
824struct ReadRunsJsonResult {
825    runs: Vec<RecordedRunInfo>,
826    last_pruned_at: Option<DateTime<Utc>>,
827    write_permission: RunsJsonWritePermission,
828}
829
830/// Reads and deserializes `runs.json.zst`, converting to the internal
831/// representation.
832fn read_runs_json(runs_dir: &Utf8Path) -> Result<ReadRunsJsonResult, RunStoreError> {
833    let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
834    let file = match File::open(&runs_json_path) {
835        Ok(file) => file,
836        Err(error) => {
837            if error.kind() == io::ErrorKind::NotFound {
838                // The file doesn't exist yet, so we can write a new one.
839                return Ok(ReadRunsJsonResult {
840                    runs: Vec::new(),
841                    last_pruned_at: None,
842                    write_permission: RunsJsonWritePermission::Allowed,
843                });
844            } else {
845                return Err(RunStoreError::RunListRead {
846                    path: runs_json_path,
847                    error,
848                });
849            }
850        }
851    };
852
853    let decoder = zstd::stream::Decoder::new(file).map_err(|error| RunStoreError::RunListRead {
854        path: runs_json_path.clone(),
855        error,
856    })?;
857
858    let list: RecordedRunList =
859        serde_json::from_reader(decoder).map_err(|error| RunStoreError::RunListDeserialize {
860            path: runs_json_path,
861            error,
862        })?;
863    let write_permission = list.write_permission();
864    let data = list.into_data();
865    Ok(ReadRunsJsonResult {
866        runs: data.runs,
867        last_pruned_at: data.last_pruned_at,
868        write_permission,
869    })
870}
871
872/// Serializes and writes runs.json.zst from internal representation.
873fn write_runs_json(
874    runs_dir: &Utf8Path,
875    runs: &[RecordedRunInfo],
876    last_pruned_at: Option<DateTime<Utc>>,
877) -> Result<(), RunStoreError> {
878    let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
879    let list = RecordedRunList::from_data(runs, last_pruned_at);
880
881    atomicwrites::AtomicFile::new(&runs_json_path, atomicwrites::AllowOverwrite)
882        .write(|file| {
883            // Use compression level 3, consistent with other zstd usage in the crate.
884            let mut encoder = zstd::stream::Encoder::new(file, 3)?;
885            serde_json::to_writer_pretty(&mut encoder, &list)?;
886            encoder.finish()?;
887            Ok(())
888        })
889        .map_err(|error| RunStoreError::RunListWrite {
890            path: runs_json_path,
891            error,
892        })?;
893
894    Ok(())
895}
896
897/// A run store that has been locked for shared (read-only) access.
898///
899/// Multiple readers can hold this lock simultaneously, but it is exclusive
900/// with the exclusive lock used for writing.
901#[derive(Debug)]
902pub struct SharedLockedRunStore<'store> {
903    runs_dir: StoreRunsDir<'store>,
904    #[expect(dead_code, reason = "held for lock duration")]
905    locked_file: DebugIgnore<File>,
906    runs: Vec<RecordedRunInfo>,
907    write_permission: RunsJsonWritePermission,
908    run_id_index: RunIdIndex,
909}
910
911impl<'store> SharedLockedRunStore<'store> {
912    /// Returns a snapshot of the runs data, consuming self and releasing the
913    /// lock.
914    pub fn into_snapshot(self) -> RunStoreSnapshot {
915        RunStoreSnapshot {
916            runs_dir: self.runs_dir.as_path().to_owned(),
917            runs: self.runs,
918            write_permission: self.write_permission,
919            run_id_index: self.run_id_index,
920        }
921    }
922}
923
924/// A snapshot of run store data.
925#[derive(Debug)]
926pub struct RunStoreSnapshot {
927    runs_dir: Utf8PathBuf,
928    runs: Vec<RecordedRunInfo>,
929    write_permission: RunsJsonWritePermission,
930    run_id_index: RunIdIndex,
931}
932
933impl RunStoreSnapshot {
934    /// Returns the runs directory.
935    pub fn runs_dir(&self) -> StoreRunsDir<'_> {
936        StoreRunsDir(&self.runs_dir)
937    }
938
939    /// Returns whether this nextest can write to the runs.json.zst file.
940    ///
941    /// If the file has a newer format version than we support, writing is denied.
942    pub fn write_permission(&self) -> RunsJsonWritePermission {
943        self.write_permission
944    }
945
946    /// Returns a list of recorded runs.
947    pub fn runs(&self) -> &[RecordedRunInfo] {
948        &self.runs
949    }
950
951    /// Returns the number of recorded runs.
952    pub fn run_count(&self) -> usize {
953        self.runs.len()
954    }
955
956    /// Returns the total compressed size of all recorded runs in bytes.
957    pub fn total_size(&self) -> u64 {
958        self.runs.iter().map(|r| r.sizes.total_compressed()).sum()
959    }
960
961    /// Resolves a run ID selector to a run result.
962    ///
963    /// For [`RunIdSelector::Latest`], returns the most recent run by start
964    /// time.
965    /// For [`RunIdSelector::Prefix`], resolves the prefix to a specific run.
966    pub fn resolve_run_id(
967        &self,
968        selector: &RunIdSelector,
969    ) -> Result<ResolveRunIdResult, RunIdResolutionError> {
970        match selector {
971            RunIdSelector::Latest => self.most_recent_run(),
972            RunIdSelector::Prefix(prefix) => {
973                let run_id = self.resolve_run_id_prefix(prefix)?;
974                Ok(ResolveRunIdResult { run_id })
975            }
976        }
977    }
978
979    /// Resolves a run ID prefix to a full UUID.
980    ///
981    /// The prefix must be a valid hexadecimal string. If the prefix matches
982    /// exactly one run, that run's UUID is returned. Otherwise, an error is
983    /// returned indicating whether no runs matched or multiple runs matched.
984    fn resolve_run_id_prefix(&self, prefix: &str) -> Result<ReportUuid, RunIdResolutionError> {
985        self.run_id_index.resolve_prefix(prefix).map_err(|err| {
986            match err {
987                PrefixResolutionError::NotFound => RunIdResolutionError::NotFound {
988                    prefix: prefix.to_string(),
989                },
990                PrefixResolutionError::Ambiguous { count, candidates } => {
991                    // Convert UUIDs to full RecordedRunInfo and sort by start time (most recent first).
992                    let mut candidates: Vec<_> = candidates
993                        .into_iter()
994                        .filter_map(|run_id| self.get_run(run_id).cloned())
995                        .collect();
996                    candidates.sort_by(|a, b| b.started_at.cmp(&a.started_at));
997                    RunIdResolutionError::Ambiguous {
998                        prefix: prefix.to_string(),
999                        count,
1000                        candidates,
1001                        run_id_index: self.run_id_index.clone(),
1002                    }
1003                }
1004                PrefixResolutionError::InvalidPrefix => RunIdResolutionError::InvalidPrefix {
1005                    prefix: prefix.to_string(),
1006                },
1007            }
1008        })
1009    }
1010
1011    /// Returns the run ID index for computing shortest unique prefixes.
1012    pub fn run_id_index(&self) -> &RunIdIndex {
1013        &self.run_id_index
1014    }
1015
1016    /// Looks up a run by its exact UUID.
1017    pub fn get_run(&self, run_id: ReportUuid) -> Option<&RecordedRunInfo> {
1018        self.runs.iter().find(|r| r.run_id == run_id)
1019    }
1020
1021    /// Returns the most recent run by start time.
1022    ///
1023    /// Returns an error if there are no runs at all.
1024    pub fn most_recent_run(&self) -> Result<ResolveRunIdResult, RunIdResolutionError> {
1025        self.runs
1026            .iter()
1027            .max_by_key(|r| r.started_at)
1028            .map(|r| ResolveRunIdResult { run_id: r.run_id })
1029            .ok_or(RunIdResolutionError::NoRuns)
1030    }
1031
1032    /// Computes which runs would be deleted by a prune operation.
1033    ///
1034    /// This is used for dry-run mode to show what would be deleted without
1035    /// actually deleting anything. Returns a [`PrunePlan`] containing the runs
1036    /// that would be deleted, sorted by start time (oldest first).
1037    pub fn compute_prune_plan(&self, policy: &RecordRetentionPolicy) -> PrunePlan {
1038        PrunePlan::compute(&self.runs, policy)
1039    }
1040}
1041
1042/// A snapshot paired with precomputed replayability status for all runs.
1043///
1044/// This struct maintains the invariant that every run in the snapshot has a
1045/// corresponding entry in the replayability map. Use [`Self::new`] to compute
1046/// replayability for all runs, or `Self::new_for_test` for testing.
1047#[derive(Debug)]
1048pub struct SnapshotWithReplayability<'a> {
1049    snapshot: &'a RunStoreSnapshot,
1050    replayability: HashMap<ReportUuid, ReplayabilityStatus>,
1051    latest_run_id: Option<ReportUuid>,
1052}
1053
1054impl<'a> SnapshotWithReplayability<'a> {
1055    /// Creates a new snapshot with replayability by checking all runs.
1056    ///
1057    /// This computes [`ReplayabilityStatus`] for each run by checking file
1058    /// existence and format versions.
1059    pub fn new(snapshot: &'a RunStoreSnapshot) -> Self {
1060        let runs_dir = snapshot.runs_dir();
1061        let replayability: HashMap<_, _> = snapshot
1062            .runs()
1063            .iter()
1064            .map(|run| {
1065                (
1066                    run.run_id,
1067                    run.check_replayability(&runs_dir.run_files(run.run_id)),
1068                )
1069            })
1070            .collect();
1071
1072        // Find the latest run by time.
1073        let latest_run_id = snapshot.most_recent_run().ok().map(|r| r.run_id);
1074
1075        Self {
1076            snapshot,
1077            replayability,
1078            latest_run_id,
1079        }
1080    }
1081
1082    /// Returns a reference to the underlying snapshot.
1083    pub fn snapshot(&self) -> &'a RunStoreSnapshot {
1084        self.snapshot
1085    }
1086
1087    /// Returns the replayability map.
1088    pub fn replayability(&self) -> &HashMap<ReportUuid, ReplayabilityStatus> {
1089        &self.replayability
1090    }
1091
1092    /// Returns the replayability status for a specific run.
1093    ///
1094    /// # Panics
1095    ///
1096    /// Panics if the run ID is not in the snapshot. This maintains the
1097    /// invariant that all runs in the snapshot have replayability computed.
1098    pub fn get_replayability(&self, run_id: ReportUuid) -> &ReplayabilityStatus {
1099        self.replayability
1100            .get(&run_id)
1101            .expect("run ID should be in replayability map")
1102    }
1103
1104    /// Returns the ID of the most recent run by start time, if any.
1105    pub fn latest_run_id(&self) -> Option<ReportUuid> {
1106        self.latest_run_id
1107    }
1108}
1109
1110#[cfg(test)]
1111impl SnapshotWithReplayability<'_> {
1112    /// Creates a snapshot with replayability for testing.
1113    ///
1114    /// All runs are marked as [`ReplayabilityStatus::Replayable`] by default.
1115    pub fn new_for_test(snapshot: &RunStoreSnapshot) -> SnapshotWithReplayability<'_> {
1116        let replayability: HashMap<_, _> = snapshot
1117            .runs()
1118            .iter()
1119            .map(|run| (run.run_id, ReplayabilityStatus::Replayable))
1120            .collect();
1121
1122        // For tests, latest is just the most recent by time.
1123        let latest_run_id = snapshot
1124            .runs()
1125            .iter()
1126            .max_by_key(|r| r.started_at)
1127            .map(|r| r.run_id);
1128
1129        SnapshotWithReplayability {
1130            snapshot,
1131            replayability,
1132            latest_run_id,
1133        }
1134    }
1135}
1136
1137#[cfg(test)]
1138impl RunStoreSnapshot {
1139    /// Creates a new snapshot for testing.
1140    pub(crate) fn new_for_test(runs: Vec<RecordedRunInfo>) -> Self {
1141        use super::run_id_index::RunIdIndex;
1142
1143        let run_id_index = RunIdIndex::new(&runs);
1144        Self {
1145            runs_dir: Utf8PathBuf::from("/test/runs"),
1146            runs,
1147            write_permission: RunsJsonWritePermission::Allowed,
1148            run_id_index,
1149        }
1150    }
1151}
1152
1153/// The kind of lock to acquire.
1154#[derive(Clone, Copy)]
1155enum LockKind {
1156    Shared,
1157    Exclusive,
1158}
1159
1160/// Acquires a file lock with retries, timing out after 5 seconds.
1161///
1162/// This handles both brief contention (another nextest process finishing up)
1163/// and filesystems where locking may not work properly (e.g., NFS).
1164fn acquire_lock_with_retry(
1165    file: &File,
1166    lock_file_path: &Utf8Path,
1167    kind: LockKind,
1168) -> Result<(), RunStoreError> {
1169    const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
1170    const LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(100);
1171
1172    let start = Instant::now();
1173    loop {
1174        let result = match kind {
1175            LockKind::Shared => file.try_lock_shared(),
1176            LockKind::Exclusive => file.try_lock(),
1177        };
1178
1179        match result {
1180            Ok(()) => return Ok(()),
1181            Err(TryLockError::WouldBlock) => {
1182                // Lock is held by another process. Retry if we haven't timed out.
1183                if start.elapsed() >= LOCK_TIMEOUT {
1184                    return Err(RunStoreError::FileLockTimeout {
1185                        path: lock_file_path.to_owned(),
1186                        timeout_secs: LOCK_TIMEOUT.as_secs(),
1187                    });
1188                }
1189                thread::sleep(LOCK_RETRY_INTERVAL);
1190            }
1191            Err(TryLockError::Error(error)) => {
1192                // Some other error (e.g., locking not supported on this filesystem).
1193                return Err(RunStoreError::FileLock {
1194                    path: lock_file_path.to_owned(),
1195                    error,
1196                });
1197            }
1198        }
1199    }
1200}