nextest_runner/record/
store.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Run store management for nextest recordings.
5//!
6//! The run store is a directory that contains all recorded test runs. It provides:
7//!
8//! - A lock file for exclusive access during modifications.
9//! - A zstd-compressed JSON file (`runs.json.zst`) listing all recorded runs.
10//! - Individual directories for each run containing the archive and log.
11
12use super::{
13    display::{DisplayRecordedRunInfo, DisplayRecordedRunInfoDetailed, RunListAlignment, Styles},
14    format::{
15        RECORD_FORMAT_VERSION, RUN_LOG_FILE_NAME, RecordedRunList, RunsJsonWritePermission,
16        STORE_ZIP_FILE_NAME,
17    },
18    recorder::{RunRecorder, StoreSizes},
19    retention::{
20        PruneKind, PrunePlan, PruneResult, RecordRetentionPolicy, delete_orphaned_dirs, delete_runs,
21    },
22    run_id_index::{PrefixResolutionError, RunIdIndex, RunIdSelector, ShortestRunIdPrefix},
23};
24use crate::{
25    errors::{RunIdResolutionError, RunStoreError},
26    helpers::{ThemeCharacters, u32_decimal_char_width, usize_decimal_char_width},
27    redact::Redactor,
28};
29use camino::{Utf8Path, Utf8PathBuf};
30use chrono::{DateTime, FixedOffset, Local, TimeDelta, Utc};
31use debug_ignore::DebugIgnore;
32use quick_junit::ReportUuid;
33use semver::Version;
34use std::{
35    collections::{BTreeMap, HashMap, HashSet},
36    fmt,
37    fs::{File, TryLockError},
38    io,
39    num::NonZero,
40    thread,
41    time::{Duration, Instant},
42};
43
44static RUNS_LOCK_FILE_NAME: &str = "runs.lock";
45static RUNS_JSON_FILE_NAME: &str = "runs.json.zst";
46
47/// A reference to the runs directory in a run store.
48///
49/// This provides methods to compute paths within the runs directory.
50#[derive(Clone, Copy, Debug)]
51pub struct StoreRunsDir<'a>(&'a Utf8Path);
52
53impl<'a> StoreRunsDir<'a> {
54    /// Returns the path to a specific run's directory.
55    pub fn run_dir(self, run_id: ReportUuid) -> Utf8PathBuf {
56        self.0.join(run_id.to_string())
57    }
58
59    /// Returns the underlying path to the runs directory.
60    pub fn as_path(self) -> &'a Utf8Path {
61        self.0
62    }
63}
64
65/// Manages the storage of recorded test runs.
66///
67/// The run store is a directory containing a list of recorded runs and their data.
68/// Use [`RunStore::lock_exclusive`] to acquire exclusive access before creating
69/// new runs.
70#[derive(Debug)]
71pub struct RunStore {
72    runs_dir: Utf8PathBuf,
73}
74
75impl RunStore {
76    /// Creates a new `RunStore` at the given directory.
77    ///
78    /// Creates the directory if it doesn't exist.
79    pub fn new(store_dir: &Utf8Path) -> Result<Self, RunStoreError> {
80        let runs_dir = store_dir.join("runs");
81        std::fs::create_dir_all(&runs_dir).map_err(|error| RunStoreError::RunDirCreate {
82            run_dir: runs_dir.clone(),
83            error,
84        })?;
85
86        Ok(Self { runs_dir })
87    }
88
89    /// Returns the runs directory.
90    pub fn runs_dir(&self) -> StoreRunsDir<'_> {
91        StoreRunsDir(&self.runs_dir)
92    }
93
94    /// Acquires a shared lock on the run store for reading.
95    ///
96    /// Multiple readers can hold the shared lock simultaneously, but the shared
97    /// lock is exclusive with the exclusive lock (used for writing).
98    ///
99    /// Uses non-blocking lock attempts with retries to handle both brief
100    /// contention and filesystems where locking may not work (e.g., NFS).
101    pub fn lock_shared(&self) -> Result<SharedLockedRunStore<'_>, RunStoreError> {
102        let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
103        let file = std::fs::OpenOptions::new()
104            .create(true)
105            .truncate(false)
106            .write(true)
107            .open(&lock_file_path)
108            .map_err(|error| RunStoreError::FileLock {
109                path: lock_file_path.clone(),
110                error,
111            })?;
112
113        acquire_lock_with_retry(&file, &lock_file_path, LockKind::Shared)?;
114        let result = read_runs_json(&self.runs_dir)?;
115        let run_id_index = RunIdIndex::new(&result.runs);
116
117        Ok(SharedLockedRunStore {
118            runs_dir: StoreRunsDir(&self.runs_dir),
119            locked_file: DebugIgnore(file),
120            runs: result.runs,
121            write_permission: result.write_permission,
122            run_id_index,
123        })
124    }
125
126    /// Acquires an exclusive lock on the run store.
127    ///
128    /// This lock should only be held for a short duration (just long enough to
129    /// add a run to the list and create its directory).
130    ///
131    /// Uses non-blocking lock attempts with retries to handle both brief
132    /// contention and filesystems where locking may not work (e.g., NFS).
133    pub fn lock_exclusive(&self) -> Result<ExclusiveLockedRunStore<'_>, RunStoreError> {
134        let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
135        let file = std::fs::OpenOptions::new()
136            .create(true)
137            .truncate(false)
138            .write(true)
139            .open(&lock_file_path)
140            .map_err(|error| RunStoreError::FileLock {
141                path: lock_file_path.clone(),
142                error,
143            })?;
144
145        acquire_lock_with_retry(&file, &lock_file_path, LockKind::Exclusive)?;
146        let result = read_runs_json(&self.runs_dir)?;
147
148        Ok(ExclusiveLockedRunStore {
149            runs_dir: StoreRunsDir(&self.runs_dir),
150            locked_file: DebugIgnore(file),
151            runs: result.runs,
152            last_pruned_at: result.last_pruned_at,
153            write_permission: result.write_permission,
154        })
155    }
156}
157
158/// A run store that has been locked for exclusive access.
159///
160/// The lifetime parameter ensures this isn't held for longer than the
161/// corresponding [`RunStore`].
162#[derive(Debug)]
163pub struct ExclusiveLockedRunStore<'store> {
164    runs_dir: StoreRunsDir<'store>,
165    // Held for RAII lock semantics; the lock is released when this struct is dropped.
166    #[expect(dead_code)]
167    locked_file: DebugIgnore<File>,
168    runs: Vec<RecordedRunInfo>,
169    last_pruned_at: Option<DateTime<Utc>>,
170    write_permission: RunsJsonWritePermission,
171}
172
173impl<'store> ExclusiveLockedRunStore<'store> {
174    /// Returns the runs directory.
175    pub fn runs_dir(&self) -> StoreRunsDir<'store> {
176        self.runs_dir
177    }
178
179    /// Returns whether this nextest can write to the runs.json.zst file.
180    ///
181    /// If the file has a newer format version than we support, writing is denied.
182    pub fn write_permission(&self) -> RunsJsonWritePermission {
183        self.write_permission
184    }
185
186    /// Marks a run as completed and persists the change to disk.
187    ///
188    /// Updates sizes, `status`, and `duration_secs` to the given values.
189    /// Returns `true` if the run was found and updated, `false` if no run
190    /// with the given ID exists (in which case nothing is persisted).
191    ///
192    /// Returns an error if writing is denied due to a format version mismatch.
193    ///
194    /// The status should not be `Incomplete` since we're completing the run.
195    pub fn complete_run(
196        &mut self,
197        run_id: ReportUuid,
198        sizes: StoreSizes,
199        status: RecordedRunStatus,
200        duration_secs: Option<f64>,
201    ) -> Result<bool, RunStoreError> {
202        if let RunsJsonWritePermission::Denied {
203            file_version,
204            max_supported_version,
205        } = self.write_permission
206        {
207            return Err(RunStoreError::FormatVersionTooNew {
208                file_version,
209                max_supported_version,
210            });
211        }
212
213        let found = self.mark_run_completed_inner(run_id, sizes, status, duration_secs);
214        if found {
215            write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
216        }
217        Ok(found)
218    }
219
220    /// Updates a run's metadata in memory.
221    fn mark_run_completed_inner(
222        &mut self,
223        run_id: ReportUuid,
224        sizes: StoreSizes,
225        status: RecordedRunStatus,
226        duration_secs: Option<f64>,
227    ) -> bool {
228        for run in &mut self.runs {
229            if run.run_id == run_id {
230                run.sizes = RecordedSizes {
231                    log: ComponentSizes {
232                        compressed: sizes.log.compressed,
233                        uncompressed: sizes.log.uncompressed,
234                        entries: sizes.log.entries,
235                    },
236                    store: ComponentSizes {
237                        compressed: sizes.store.compressed,
238                        uncompressed: sizes.store.uncompressed,
239                        entries: sizes.store.entries,
240                    },
241                };
242                run.status = status;
243                run.duration_secs = duration_secs;
244                run.last_written_at = Local::now().fixed_offset();
245                return true;
246            }
247        }
248        false
249    }
250
251    /// Prunes runs according to the given retention policy.
252    ///
253    /// This method:
254    /// 1. Determines which runs to delete based on the policy
255    /// 2. Deletes those run directories from disk
256    /// 3. Deletes any orphaned directories not tracked in runs.json.zst
257    /// 4. Updates the run list in memory and on disk
258    ///
259    /// The `kind` parameter indicates whether this is explicit pruning (from a
260    /// user command) or implicit pruning (automatic during recording). This
261    /// affects how errors are displayed.
262    ///
263    /// Returns the result of the pruning operation, including any errors that
264    /// occurred while deleting individual runs.
265    ///
266    /// Returns an error if writing is denied due to a format version mismatch.
267    pub fn prune(
268        &mut self,
269        policy: &RecordRetentionPolicy,
270        kind: PruneKind,
271    ) -> Result<PruneResult, RunStoreError> {
272        if let RunsJsonWritePermission::Denied {
273            file_version,
274            max_supported_version,
275        } = self.write_permission
276        {
277            return Err(RunStoreError::FormatVersionTooNew {
278                file_version,
279                max_supported_version,
280            });
281        }
282
283        let now = Utc::now();
284        let to_delete: HashSet<_> = policy
285            .compute_runs_to_delete(&self.runs, now)
286            .into_iter()
287            .collect();
288
289        let runs_dir = self.runs_dir();
290        let mut result = if to_delete.is_empty() {
291            PruneResult::default()
292        } else {
293            delete_runs(runs_dir, &mut self.runs, &to_delete)
294        };
295        result.kind = kind;
296
297        let known_runs: HashSet<_> = self.runs.iter().map(|r| r.run_id).collect();
298        delete_orphaned_dirs(self.runs_dir, &known_runs, &mut result);
299
300        if result.deleted_count > 0 || result.orphans_deleted > 0 {
301            // Update last_pruned_at since we performed pruning.
302            self.last_pruned_at = Some(now);
303            write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
304        }
305
306        Ok(result)
307    }
308
309    /// Prunes runs if needed, based on time since last prune and limit thresholds.
310    ///
311    /// This method implements implicit pruning, which occurs:
312    /// - If more than 1 day has passed since the last prune, OR
313    /// - If any retention limit is exceeded by 1.5x.
314    ///
315    /// Use [`Self::prune`] for explicit pruning that always runs regardless of these conditions.
316    ///
317    /// Returns `Ok(None)` if pruning was skipped, `Ok(Some(result))` if pruning occurred.
318    pub fn prune_if_needed(
319        &mut self,
320        policy: &RecordRetentionPolicy,
321    ) -> Result<Option<PruneResult>, RunStoreError> {
322        const PRUNE_INTERVAL: TimeDelta = match TimeDelta::try_days(1) {
323            Some(d) => d,
324            None => panic!("1 day should always be a valid TimeDelta"),
325        };
326        const LIMIT_EXCEEDED_FACTOR: f64 = 1.5;
327
328        let now = Utc::now();
329
330        // Check if pruning is needed.
331        let time_since_last_prune = self
332            .last_pruned_at
333            .map(|last| now.signed_duration_since(last))
334            .unwrap_or(TimeDelta::MAX);
335
336        let should_prune = time_since_last_prune >= PRUNE_INTERVAL
337            || policy.limits_exceeded_by_factor(&self.runs, LIMIT_EXCEEDED_FACTOR);
338
339        if should_prune {
340            Ok(Some(self.prune(policy, PruneKind::Implicit)?))
341        } else {
342            Ok(None)
343        }
344    }
345
346    /// Creates a run recorder for a new run.
347    ///
348    /// Adds the run to the list and creates its directory. Consumes self,
349    /// dropping the exclusive lock.
350    ///
351    /// `max_output_size` specifies the maximum size of a single output (stdout/stderr)
352    /// before truncation.
353    ///
354    /// Returns the recorder and the shortest unique prefix for the run ID (for
355    /// display purposes), or an error if writing is denied due to a format
356    /// version mismatch.
357    #[expect(clippy::too_many_arguments)]
358    pub(crate) fn create_run_recorder(
359        mut self,
360        run_id: ReportUuid,
361        nextest_version: Version,
362        started_at: DateTime<FixedOffset>,
363        cli_args: Vec<String>,
364        build_scope_args: Vec<String>,
365        env_vars: BTreeMap<String, String>,
366        max_output_size: bytesize::ByteSize,
367        parent_run_id: Option<ReportUuid>,
368    ) -> Result<(RunRecorder, ShortestRunIdPrefix), RunStoreError> {
369        if let RunsJsonWritePermission::Denied {
370            file_version,
371            max_supported_version,
372        } = self.write_permission
373        {
374            return Err(RunStoreError::FormatVersionTooNew {
375                file_version,
376                max_supported_version,
377            });
378        }
379
380        // Add to the list of runs before creating the directory. This ensures
381        // that if creation fails, an empty run directory isn't left behind. (It
382        // does mean that there may be spurious entries in the list of runs,
383        // which will be dealt with during pruning.)
384
385        let now = Local::now().fixed_offset();
386        let run = RecordedRunInfo {
387            run_id,
388            store_format_version: RECORD_FORMAT_VERSION,
389            nextest_version,
390            started_at,
391            last_written_at: now,
392            duration_secs: None,
393            cli_args,
394            build_scope_args,
395            env_vars,
396            parent_run_id,
397            sizes: RecordedSizes::default(),
398            status: RecordedRunStatus::Incomplete,
399        };
400        self.runs.push(run);
401
402        // If the parent run ID is set, update its last written at time.
403        if let Some(parent_run_id) = parent_run_id
404            && let Some(parent_run) = self.runs.iter_mut().find(|r| r.run_id == parent_run_id)
405        {
406            parent_run.last_written_at = now;
407        }
408
409        write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
410
411        // Compute the unique prefix now that the run is in the list.
412        let index = RunIdIndex::new(&self.runs);
413        let unique_prefix = index
414            .shortest_unique_prefix(run_id)
415            .expect("run was just added to the list");
416
417        // Create the run directory while still holding the lock. This prevents
418        // a race where another process could prune the newly-added run entry
419        // before the directory exists, leaving an orphaned directory. The lock
420        // is released when `self` is dropped.
421        let run_dir = self.runs_dir().run_dir(run_id);
422
423        let recorder = RunRecorder::new(run_dir, max_output_size)?;
424        Ok((recorder, unique_prefix))
425    }
426}
427
428/// Information about a recorded run.
429#[derive(Clone, Debug)]
430pub struct RecordedRunInfo {
431    /// The unique identifier for this run.
432    pub run_id: ReportUuid,
433    /// The format version of this run's store.zip archive.
434    ///
435    /// This allows checking replayability without opening the archive.
436    pub store_format_version: u32,
437    /// The version of nextest that created this run.
438    pub nextest_version: Version,
439    /// When the run started.
440    pub started_at: DateTime<FixedOffset>,
441    /// When this run was last written to.
442    ///
443    /// Used for LRU eviction. Updated when the run is created, when the run
444    /// completes, and when a rerun references this run.
445    pub last_written_at: DateTime<FixedOffset>,
446    /// Duration of the run in seconds.
447    ///
448    /// This is `None` for incomplete runs.
449    pub duration_secs: Option<f64>,
450    /// The command-line arguments used to invoke nextest.
451    pub cli_args: Vec<String>,
452    /// Build scope arguments (package and target selection).
453    ///
454    /// These determine which packages and targets are built. In a rerun chain,
455    /// these are inherited from the original run unless explicitly overridden.
456    pub build_scope_args: Vec<String>,
457    /// Environment variables that affect nextest behavior (NEXTEST_* and CARGO_*).
458    pub env_vars: BTreeMap<String, String>,
459    /// If this is a rerun, the ID of the parent run.
460    ///
461    /// This forms a chain for iterative fix-and-rerun workflows.
462    pub parent_run_id: Option<ReportUuid>,
463    /// Sizes broken down by component (log and store).
464    pub sizes: RecordedSizes,
465    /// The status and statistics for this run.
466    pub status: RecordedRunStatus,
467}
468
469/// Sizes broken down by component (log and store).
470#[derive(Clone, Copy, Debug, Default)]
471pub struct RecordedSizes {
472    /// Sizes for the run log (run.log.zst).
473    pub log: ComponentSizes,
474    /// Sizes for the store archive (store.zip).
475    pub store: ComponentSizes,
476}
477
478/// Compressed and uncompressed sizes for a single component.
479#[derive(Clone, Copy, Debug, Default)]
480pub struct ComponentSizes {
481    /// Compressed size in bytes.
482    pub compressed: u64,
483    /// Uncompressed size in bytes.
484    pub uncompressed: u64,
485    /// Number of entries (records for log, files for store).
486    pub entries: u64,
487}
488
489impl RecordedSizes {
490    /// Returns the total compressed size (log + store).
491    pub fn total_compressed(&self) -> u64 {
492        self.log.compressed + self.store.compressed
493    }
494
495    /// Returns the total uncompressed size (log + store).
496    pub fn total_uncompressed(&self) -> u64 {
497        self.log.uncompressed + self.store.uncompressed
498    }
499
500    /// Returns the total number of entries (log records + store files).
501    pub fn total_entries(&self) -> u64 {
502        self.log.entries + self.store.entries
503    }
504}
505
506/// Status and statistics for a recorded run.
507#[derive(Clone, Debug)]
508pub enum RecordedRunStatus {
509    /// The run was interrupted before completion.
510    Incomplete,
511    /// A normal test run completed (all tests finished).
512    Completed(CompletedRunStats),
513    /// A normal test run was cancelled before all tests finished.
514    Cancelled(CompletedRunStats),
515    /// A stress test run completed (all iterations finished).
516    StressCompleted(StressCompletedRunStats),
517    /// A stress test run was cancelled before all iterations finished.
518    StressCancelled(StressCompletedRunStats),
519    /// An unknown status from a newer version of nextest.
520    ///
521    /// This variant is used for forward compatibility when reading runs.json.zst
522    /// files created by newer nextest versions that may have new status types.
523    Unknown,
524}
525
526impl RecordedRunStatus {
527    /// Returns a short status string for display.
528    pub fn short_status_str(&self) -> &'static str {
529        match self {
530            Self::Incomplete => "incomplete",
531            Self::Unknown => "unknown",
532            Self::Completed(_) => "completed",
533            Self::Cancelled(_) => "cancelled",
534            Self::StressCompleted(_) => "stress completed",
535            Self::StressCancelled(_) => "stress cancelled",
536        }
537    }
538
539    /// Returns the exit code for completed runs, or `None` for incomplete/unknown runs.
540    pub fn exit_code(&self) -> Option<i32> {
541        match self {
542            Self::Incomplete | Self::Unknown => None,
543            Self::Completed(stats) | Self::Cancelled(stats) => Some(stats.exit_code),
544            Self::StressCompleted(stats) | Self::StressCancelled(stats) => Some(stats.exit_code),
545        }
546    }
547}
548
549/// Statistics for a normal test run that finished (completed or cancelled).
550#[derive(Clone, Copy, Debug)]
551pub struct CompletedRunStats {
552    /// The number of tests that were expected to run.
553    pub initial_run_count: usize,
554    /// The number of tests that passed.
555    pub passed: usize,
556    /// The number of tests that failed (including exec failures and timeouts).
557    pub failed: usize,
558    /// The exit code from the run.
559    pub exit_code: i32,
560}
561
562/// Statistics for a stress test run that finished (completed or cancelled).
563#[derive(Clone, Copy, Debug)]
564pub struct StressCompletedRunStats {
565    /// The number of stress iterations that were expected to run, if known.
566    ///
567    /// This is `None` when the stress test was run without a fixed iteration count
568    /// (e.g., `--stress-duration`).
569    pub initial_iteration_count: Option<NonZero<u32>>,
570    /// The number of stress iterations that succeeded.
571    pub success_count: u32,
572    /// The number of stress iterations that failed.
573    pub failed_count: u32,
574    /// The exit code from the run.
575    pub exit_code: i32,
576}
577
578// ---
579// Replayability checking
580// ---
581
582/// The result of checking whether a run can be replayed.
583#[derive(Clone, Debug)]
584pub enum ReplayabilityStatus {
585    /// The run is definitely replayable.
586    ///
587    /// No blocking reasons and no uncertain conditions.
588    Replayable,
589    /// The run is definitely not replayable.
590    ///
591    /// Contains at least one blocking reason.
592    NotReplayable(Vec<NonReplayableReason>),
593    /// The run might be replayable but is incomplete.
594    ///
595    /// The archive might be usable, but we'd need to open `store.zip` to
596    /// verify all expected files are present.
597    Incomplete,
598}
599
600/// A definite reason why a run cannot be replayed.
601#[derive(Clone, Debug, PartialEq, Eq)]
602pub enum NonReplayableReason {
603    /// The run's store format version is newer than this nextest supports.
604    ///
605    /// This nextest version cannot read the archive format.
606    StoreFormatTooNew {
607        /// The format version in the run's archive.
608        run_version: u32,
609        /// The maximum format version this nextest supports.
610        max_supported: u32,
611    },
612    /// The `store.zip` file is missing from the run directory.
613    MissingStoreZip,
614    /// The `run.log.zst` file is missing from the run directory.
615    MissingRunLog,
616    /// The run status is `Unknown` (from a newer nextest version).
617    ///
618    /// We cannot safely replay since we don't understand the run's state.
619    StatusUnknown,
620}
621
622impl fmt::Display for NonReplayableReason {
623    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624        match self {
625            Self::StoreFormatTooNew {
626                run_version,
627                max_supported,
628            } => {
629                write!(
630                    f,
631                    "store format version {} is newer than supported (version {})",
632                    run_version, max_supported
633                )
634            }
635            Self::MissingStoreZip => {
636                write!(f, "store.zip is missing")
637            }
638            Self::MissingRunLog => {
639                write!(f, "run.log.zst is missing")
640            }
641            Self::StatusUnknown => {
642                write!(f, "run status is unknown (from a newer nextest version)")
643            }
644        }
645    }
646}
647
648/// Result of looking up the most recent replayable run.
649#[derive(Clone, Copy, Debug)]
650pub struct ResolveRunIdResult {
651    /// The run ID of the most recent replayable run.
652    pub run_id: ReportUuid,
653    /// The number of newer runs that are not replayable.
654    pub newer_non_replayable_count: usize,
655}
656
657impl RecordedRunStatus {
658    /// Returns the width (in decimal digits) needed to display the "passed" count.
659    ///
660    /// For non-completed runs (Incomplete, Unknown), returns 0 since they don't
661    /// display a passed count.
662    pub fn passed_count_width(&self) -> usize {
663        match self {
664            Self::Incomplete | Self::Unknown => 0,
665            Self::Completed(stats) | Self::Cancelled(stats) => {
666                usize_decimal_char_width(stats.passed)
667            }
668            Self::StressCompleted(stats) | Self::StressCancelled(stats) => {
669                // Stress tests use u32, convert to usize for width calculation.
670                u32_decimal_char_width(stats.success_count)
671            }
672        }
673    }
674}
675
676impl RecordedRunInfo {
677    /// Checks whether this run can be replayed.
678    ///
679    /// This performs a comprehensive check of all conditions that might prevent
680    /// replay, including:
681    /// - Store format version compatibility
682    /// - Presence of required files (store.zip, run.log.zst)
683    /// - Run status (unknown, incomplete)
684    ///
685    /// The `runs_dir` parameter is used to check for file existence on disk.
686    pub fn check_replayability(&self, runs_dir: StoreRunsDir<'_>) -> ReplayabilityStatus {
687        let mut blocking = Vec::new();
688        let mut is_incomplete = false;
689
690        // Check store format version.
691        if self.store_format_version > RECORD_FORMAT_VERSION {
692            blocking.push(NonReplayableReason::StoreFormatTooNew {
693                run_version: self.store_format_version,
694                max_supported: RECORD_FORMAT_VERSION,
695            });
696        }
697        // Note: When we bump format versions, add a similar StoreFormatTooOld
698        // check here.
699
700        // Check for required files on disk.
701        let run_dir = runs_dir.run_dir(self.run_id);
702        let store_zip_path = run_dir.join(STORE_ZIP_FILE_NAME);
703        let run_log_path = run_dir.join(RUN_LOG_FILE_NAME);
704
705        if !store_zip_path.exists() {
706            blocking.push(NonReplayableReason::MissingStoreZip);
707        }
708        if !run_log_path.exists() {
709            blocking.push(NonReplayableReason::MissingRunLog);
710        }
711
712        // Check run status.
713        match &self.status {
714            RecordedRunStatus::Unknown => {
715                blocking.push(NonReplayableReason::StatusUnknown);
716            }
717            RecordedRunStatus::Incomplete => {
718                is_incomplete = true;
719            }
720            RecordedRunStatus::Completed(_)
721            | RecordedRunStatus::Cancelled(_)
722            | RecordedRunStatus::StressCompleted(_)
723            | RecordedRunStatus::StressCancelled(_) => {
724                // These statuses are fine for replay.
725            }
726        }
727
728        // Return the appropriate variant based on what we found.
729        if !blocking.is_empty() {
730            ReplayabilityStatus::NotReplayable(blocking)
731        } else if is_incomplete {
732            ReplayabilityStatus::Incomplete
733        } else {
734            ReplayabilityStatus::Replayable
735        }
736    }
737
738    /// Returns a display wrapper for this run.
739    ///
740    /// The `run_id_index` is used for computing shortest unique prefixes,
741    /// which are highlighted differently in the output (similar to jj).
742    ///
743    /// The `alignment` parameter controls column alignment when displaying a
744    /// list of runs. Use [`RunListAlignment::from_runs`] to precompute
745    /// alignment for a set of runs.
746    ///
747    /// The `redactor` parameter, if provided, redacts timestamps, durations,
748    /// and sizes for snapshot testing while preserving column alignment.
749    pub fn display<'a>(
750        &'a self,
751        run_id_index: &'a RunIdIndex,
752        replayability: &'a ReplayabilityStatus,
753        alignment: RunListAlignment,
754        styles: &'a Styles,
755        redactor: &'a Redactor,
756    ) -> DisplayRecordedRunInfo<'a> {
757        DisplayRecordedRunInfo::new(
758            self,
759            run_id_index,
760            replayability,
761            alignment,
762            styles,
763            redactor,
764        )
765    }
766
767    /// Returns a detailed display wrapper for this run.
768    ///
769    /// Unlike [`Self::display`] which shows a compact table row, this provides
770    /// a multi-line detailed view suitable for the `store info` command.
771    ///
772    /// The `replayability` parameter should be computed by the caller using
773    /// [`Self::check_replayability`].
774    ///
775    /// The `now` parameter is the current time, used to compute relative
776    /// durations (e.g. "30s ago").
777    ///
778    /// The `redactor` parameter redacts paths, timestamps, durations, and sizes
779    /// for snapshot testing. Use `Redactor::noop()` if no redaction is needed.
780    pub fn display_detailed<'a>(
781        &'a self,
782        run_id_index: &'a RunIdIndex,
783        replayability: &'a ReplayabilityStatus,
784        now: DateTime<Utc>,
785        styles: &'a Styles,
786        theme_characters: &'a ThemeCharacters,
787        redactor: &'a Redactor,
788    ) -> DisplayRecordedRunInfoDetailed<'a> {
789        DisplayRecordedRunInfoDetailed::new(
790            self,
791            run_id_index,
792            replayability,
793            now,
794            styles,
795            theme_characters,
796            redactor,
797        )
798    }
799}
800
801/// Result of reading runs.json.zst.
802struct ReadRunsJsonResult {
803    runs: Vec<RecordedRunInfo>,
804    last_pruned_at: Option<DateTime<Utc>>,
805    write_permission: RunsJsonWritePermission,
806}
807
808/// Reads and deserializes `runs.json.zst`, converting to the internal
809/// representation.
810fn read_runs_json(runs_dir: &Utf8Path) -> Result<ReadRunsJsonResult, RunStoreError> {
811    let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
812    let file = match File::open(&runs_json_path) {
813        Ok(file) => file,
814        Err(error) => {
815            if error.kind() == io::ErrorKind::NotFound {
816                // The file doesn't exist yet, so we can write a new one.
817                return Ok(ReadRunsJsonResult {
818                    runs: Vec::new(),
819                    last_pruned_at: None,
820                    write_permission: RunsJsonWritePermission::Allowed,
821                });
822            } else {
823                return Err(RunStoreError::RunListRead {
824                    path: runs_json_path,
825                    error,
826                });
827            }
828        }
829    };
830
831    let decoder = zstd::stream::Decoder::new(file).map_err(|error| RunStoreError::RunListRead {
832        path: runs_json_path.clone(),
833        error,
834    })?;
835
836    let list: RecordedRunList =
837        serde_json::from_reader(decoder).map_err(|error| RunStoreError::RunListDeserialize {
838            path: runs_json_path,
839            error,
840        })?;
841    let write_permission = list.write_permission();
842    let data = list.into_data();
843    Ok(ReadRunsJsonResult {
844        runs: data.runs,
845        last_pruned_at: data.last_pruned_at,
846        write_permission,
847    })
848}
849
850/// Serializes and writes runs.json.zst from internal representation.
851fn write_runs_json(
852    runs_dir: &Utf8Path,
853    runs: &[RecordedRunInfo],
854    last_pruned_at: Option<DateTime<Utc>>,
855) -> Result<(), RunStoreError> {
856    let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
857    let list = RecordedRunList::from_data(runs, last_pruned_at);
858
859    atomicwrites::AtomicFile::new(&runs_json_path, atomicwrites::AllowOverwrite)
860        .write(|file| {
861            // Use compression level 3, consistent with other zstd usage in the crate.
862            let mut encoder = zstd::stream::Encoder::new(file, 3)?;
863            serde_json::to_writer_pretty(&mut encoder, &list)?;
864            encoder.finish()?;
865            Ok(())
866        })
867        .map_err(|error| RunStoreError::RunListWrite {
868            path: runs_json_path,
869            error,
870        })?;
871
872    Ok(())
873}
874
875/// A run store that has been locked for shared (read-only) access.
876///
877/// Multiple readers can hold this lock simultaneously, but it is exclusive
878/// with the exclusive lock used for writing.
879#[derive(Debug)]
880pub struct SharedLockedRunStore<'store> {
881    runs_dir: StoreRunsDir<'store>,
882    #[expect(dead_code, reason = "held for lock duration")]
883    locked_file: DebugIgnore<File>,
884    runs: Vec<RecordedRunInfo>,
885    write_permission: RunsJsonWritePermission,
886    run_id_index: RunIdIndex,
887}
888
889impl<'store> SharedLockedRunStore<'store> {
890    /// Returns a snapshot of the runs data, consuming self and releasing the
891    /// lock.
892    pub fn into_snapshot(self) -> RunStoreSnapshot {
893        RunStoreSnapshot {
894            runs_dir: self.runs_dir.as_path().to_owned(),
895            runs: self.runs,
896            write_permission: self.write_permission,
897            run_id_index: self.run_id_index,
898        }
899    }
900}
901
902/// A snapshot of run store data.
903#[derive(Debug)]
904pub struct RunStoreSnapshot {
905    runs_dir: Utf8PathBuf,
906    runs: Vec<RecordedRunInfo>,
907    write_permission: RunsJsonWritePermission,
908    run_id_index: RunIdIndex,
909}
910
911impl RunStoreSnapshot {
912    /// Returns the runs directory.
913    pub fn runs_dir(&self) -> StoreRunsDir<'_> {
914        StoreRunsDir(&self.runs_dir)
915    }
916
917    /// Returns whether this nextest can write to the runs.json.zst file.
918    ///
919    /// If the file has a newer format version than we support, writing is denied.
920    pub fn write_permission(&self) -> RunsJsonWritePermission {
921        self.write_permission
922    }
923
924    /// Returns a list of recorded runs.
925    pub fn runs(&self) -> &[RecordedRunInfo] {
926        &self.runs
927    }
928
929    /// Returns the number of recorded runs.
930    pub fn run_count(&self) -> usize {
931        self.runs.len()
932    }
933
934    /// Returns the total compressed size of all recorded runs in bytes.
935    pub fn total_size(&self) -> u64 {
936        self.runs.iter().map(|r| r.sizes.total_compressed()).sum()
937    }
938
939    /// Resolves a run ID selector to a run result.
940    ///
941    /// For [`RunIdSelector::Latest`], returns the most recent replayable run.
942    /// For [`RunIdSelector::Prefix`], resolves the prefix to a specific run.
943    ///
944    /// Returns a [`ResolveRunIdResult`] containing the run ID and, for
945    /// `Latest`, the count of newer incomplete runs that were skipped.
946    pub fn resolve_run_id(
947        &self,
948        selector: &RunIdSelector,
949    ) -> Result<ResolveRunIdResult, RunIdResolutionError> {
950        match selector {
951            RunIdSelector::Latest => self.most_recent_run(None),
952            RunIdSelector::Prefix(prefix) => {
953                let run_id = self.resolve_run_id_prefix(prefix)?;
954                Ok(ResolveRunIdResult {
955                    run_id,
956                    newer_non_replayable_count: 0,
957                })
958            }
959        }
960    }
961
962    /// Resolves a run ID prefix to a full UUID.
963    ///
964    /// The prefix must be a valid hexadecimal string. If the prefix matches
965    /// exactly one run, that run's UUID is returned. Otherwise, an error is
966    /// returned indicating whether no runs matched or multiple runs matched.
967    fn resolve_run_id_prefix(&self, prefix: &str) -> Result<ReportUuid, RunIdResolutionError> {
968        self.run_id_index.resolve_prefix(prefix).map_err(|err| {
969            match err {
970                PrefixResolutionError::NotFound => RunIdResolutionError::NotFound {
971                    prefix: prefix.to_string(),
972                },
973                PrefixResolutionError::Ambiguous { count, candidates } => {
974                    // Convert UUIDs to full RecordedRunInfo and sort by start time (most recent first).
975                    let mut candidates: Vec<_> = candidates
976                        .into_iter()
977                        .filter_map(|run_id| self.get_run(run_id).cloned())
978                        .collect();
979                    candidates.sort_by(|a, b| b.started_at.cmp(&a.started_at));
980                    RunIdResolutionError::Ambiguous {
981                        prefix: prefix.to_string(),
982                        count,
983                        candidates,
984                        run_id_index: self.run_id_index.clone(),
985                    }
986                }
987                PrefixResolutionError::InvalidPrefix => RunIdResolutionError::InvalidPrefix {
988                    prefix: prefix.to_string(),
989                },
990            }
991        })
992    }
993
994    /// Returns the run ID index for computing shortest unique prefixes.
995    pub fn run_id_index(&self) -> &RunIdIndex {
996        &self.run_id_index
997    }
998
999    /// Looks up a run by its exact UUID.
1000    pub fn get_run(&self, run_id: ReportUuid) -> Option<&RecordedRunInfo> {
1001        self.runs.iter().find(|r| r.run_id == run_id)
1002    }
1003
1004    /// Returns the most recent replayable run.
1005    ///
1006    /// The first run (by start time, most recent first) that is definitely
1007    /// replayable is returned. Replayability is checked via
1008    /// [`RecordedRunInfo::check_replayability`].
1009    ///
1010    /// If there are newer runs that are not replayable, those are counted and
1011    /// returned in the result.
1012    ///
1013    /// Returns an error if there are no runs at all, or if there are runs but
1014    /// none are replayable.
1015    pub fn most_recent_run(
1016        &self,
1017        replayability: Option<&HashMap<ReportUuid, ReplayabilityStatus>>,
1018    ) -> Result<ResolveRunIdResult, RunIdResolutionError> {
1019        if self.runs.is_empty() {
1020            return Err(RunIdResolutionError::NoRuns);
1021        }
1022
1023        // Sort runs by started_at in descending order (most recent first).
1024        let mut sorted_runs: Vec<_> = self.runs.iter().collect();
1025        sorted_runs.sort_by(|a, b| b.started_at.cmp(&a.started_at));
1026
1027        // Find the first replayable run and count non-replayable runs before
1028        // it.
1029        let runs_dir = self.runs_dir();
1030        let mut newer_non_replayable_count = 0;
1031        for run in sorted_runs {
1032            let is_replayable = match replayability {
1033                Some(replayability) => {
1034                    // If the replayability index is provided, use it.
1035                    let replayability = replayability.get(&run.run_id).unwrap_or_else(|| {
1036                        panic!("replayability index should have run ID {}", run.run_id)
1037                    });
1038                    matches!(replayability, &ReplayabilityStatus::Replayable)
1039                }
1040                None => {
1041                    // If the replayability index is not provided, then we do
1042                    // I/O to check replayability.
1043                    matches!(
1044                        run.check_replayability(runs_dir),
1045                        ReplayabilityStatus::Replayable
1046                    )
1047                }
1048            };
1049            if is_replayable {
1050                return Ok(ResolveRunIdResult {
1051                    run_id: run.run_id,
1052                    newer_non_replayable_count,
1053                });
1054            }
1055            newer_non_replayable_count += 1;
1056        }
1057
1058        Err(RunIdResolutionError::NoReplayableRuns {
1059            non_replayable_count: newer_non_replayable_count,
1060        })
1061    }
1062
1063    /// Computes which runs would be deleted by a prune operation.
1064    ///
1065    /// This is used for dry-run mode to show what would be deleted without
1066    /// actually deleting anything. Returns a [`PrunePlan`] containing the runs
1067    /// that would be deleted, sorted by start time (oldest first).
1068    pub fn compute_prune_plan(&self, policy: &RecordRetentionPolicy) -> PrunePlan {
1069        PrunePlan::compute(&self.runs, policy)
1070    }
1071}
1072
1073/// A snapshot paired with precomputed replayability status for all runs.
1074///
1075/// This struct maintains the invariant that every run in the snapshot has a
1076/// corresponding entry in the replayability map. Use [`Self::new`] to compute
1077/// replayability for all runs, or `Self::new_for_test` for testing.
1078#[derive(Debug)]
1079pub struct SnapshotWithReplayability<'a> {
1080    snapshot: &'a RunStoreSnapshot,
1081    replayability: HashMap<ReportUuid, ReplayabilityStatus>,
1082    latest_run_id: Option<ReportUuid>,
1083}
1084
1085impl<'a> SnapshotWithReplayability<'a> {
1086    /// Creates a new snapshot with replayability by checking all runs.
1087    ///
1088    /// This computes [`ReplayabilityStatus`] for each run by checking file
1089    /// existence and format versions.
1090    pub fn new(snapshot: &'a RunStoreSnapshot) -> Self {
1091        let runs_dir = snapshot.runs_dir();
1092        let replayability: HashMap<_, _> = snapshot
1093            .runs()
1094            .iter()
1095            .map(|run| (run.run_id, run.check_replayability(runs_dir)))
1096            .collect();
1097
1098        // Find the latest replayable run.
1099        let latest_run_id = snapshot
1100            .most_recent_run(Some(&replayability))
1101            .ok()
1102            .map(|r| r.run_id);
1103
1104        Self {
1105            snapshot,
1106            replayability,
1107            latest_run_id,
1108        }
1109    }
1110
1111    /// Returns a reference to the underlying snapshot.
1112    pub fn snapshot(&self) -> &'a RunStoreSnapshot {
1113        self.snapshot
1114    }
1115
1116    /// Returns the replayability map.
1117    pub fn replayability(&self) -> &HashMap<ReportUuid, ReplayabilityStatus> {
1118        &self.replayability
1119    }
1120
1121    /// Returns the replayability status for a specific run.
1122    ///
1123    /// # Panics
1124    ///
1125    /// Panics if the run ID is not in the snapshot. This maintains the
1126    /// invariant that all runs in the snapshot have replayability computed.
1127    pub fn get_replayability(&self, run_id: ReportUuid) -> &ReplayabilityStatus {
1128        self.replayability
1129            .get(&run_id)
1130            .expect("run ID should be in replayability map")
1131    }
1132
1133    /// Returns the ID of the most recent replayable run, if any.
1134    pub fn latest_run_id(&self) -> Option<ReportUuid> {
1135        self.latest_run_id
1136    }
1137}
1138
1139#[cfg(test)]
1140impl SnapshotWithReplayability<'_> {
1141    /// Creates a snapshot with replayability for testing.
1142    ///
1143    /// All runs are marked as [`ReplayabilityStatus::Replayable`] by default.
1144    pub fn new_for_test(snapshot: &RunStoreSnapshot) -> SnapshotWithReplayability<'_> {
1145        let replayability: HashMap<_, _> = snapshot
1146            .runs()
1147            .iter()
1148            .map(|run| (run.run_id, ReplayabilityStatus::Replayable))
1149            .collect();
1150
1151        // For tests, latest is just the most recent by time.
1152        let latest_run_id = snapshot
1153            .runs()
1154            .iter()
1155            .max_by_key(|r| r.started_at)
1156            .map(|r| r.run_id);
1157
1158        SnapshotWithReplayability {
1159            snapshot,
1160            replayability,
1161            latest_run_id,
1162        }
1163    }
1164}
1165
1166#[cfg(test)]
1167impl RunStoreSnapshot {
1168    /// Creates a new snapshot for testing.
1169    pub(crate) fn new_for_test(runs: Vec<RecordedRunInfo>) -> Self {
1170        use super::run_id_index::RunIdIndex;
1171
1172        let run_id_index = RunIdIndex::new(&runs);
1173        Self {
1174            runs_dir: Utf8PathBuf::from("/test/runs"),
1175            runs,
1176            write_permission: RunsJsonWritePermission::Allowed,
1177            run_id_index,
1178        }
1179    }
1180}
1181
1182/// The kind of lock to acquire.
1183#[derive(Clone, Copy)]
1184enum LockKind {
1185    Shared,
1186    Exclusive,
1187}
1188
1189/// Acquires a file lock with retries, timing out after 5 seconds.
1190///
1191/// This handles both brief contention (another nextest process finishing up)
1192/// and filesystems where locking may not work properly (e.g., NFS).
1193fn acquire_lock_with_retry(
1194    file: &File,
1195    lock_file_path: &Utf8Path,
1196    kind: LockKind,
1197) -> Result<(), RunStoreError> {
1198    const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
1199    const LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(100);
1200
1201    let start = Instant::now();
1202    loop {
1203        let result = match kind {
1204            LockKind::Shared => file.try_lock_shared(),
1205            LockKind::Exclusive => file.try_lock(),
1206        };
1207
1208        match result {
1209            Ok(()) => return Ok(()),
1210            Err(TryLockError::WouldBlock) => {
1211                // Lock is held by another process. Retry if we haven't timed out.
1212                if start.elapsed() >= LOCK_TIMEOUT {
1213                    return Err(RunStoreError::FileLockTimeout {
1214                        path: lock_file_path.to_owned(),
1215                        timeout_secs: LOCK_TIMEOUT.as_secs(),
1216                    });
1217                }
1218                thread::sleep(LOCK_RETRY_INTERVAL);
1219            }
1220            Err(TryLockError::Error(error)) => {
1221                // Some other error (e.g., locking not supported on this filesystem).
1222                return Err(RunStoreError::FileLock {
1223                    path: lock_file_path.to_owned(),
1224                    error,
1225                });
1226            }
1227        }
1228    }
1229}