Skip to main content

nextest_runner/record/
store.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Run store management for nextest recordings.
5//!
6//! The run store is a directory that contains all recorded test runs. It provides:
7//!
8//! - A lock file for exclusive access during modifications.
9//! - A zstd-compressed JSON file (`runs.json.zst`) listing all recorded runs.
10//! - Individual directories for each run containing the archive and log.
11
12use super::{
13    display::{DisplayRecordedRunInfo, DisplayRecordedRunInfoDetailed, RunListAlignment, Styles},
14    format::{
15        RUN_LOG_FILE_NAME, RecordedRunList, RunsJsonWritePermission, STORE_FORMAT_VERSION,
16        STORE_ZIP_FILE_NAME, StoreFormatVersion, StoreVersionIncompatibility,
17        store_format_version_for_new_run,
18    },
19    recorder::{RunRecorder, StoreSizes},
20    retention::{
21        PruneKind, PrunePlan, PruneResult, RecordRetentionPolicy, delete_orphaned_dirs, delete_runs,
22    },
23    run_id_index::{PrefixResolutionError, RunIdIndex, RunIdSelector, ShortestRunIdPrefix},
24};
25use crate::{
26    errors::{RunIdResolutionError, RunStoreError},
27    helpers::{ThemeCharacters, decimal_char_width},
28    redact::Redactor,
29};
30use camino::{Utf8Path, Utf8PathBuf};
31use chrono::{DateTime, FixedOffset, Local, TimeDelta, Utc};
32use debug_ignore::DebugIgnore;
33use quick_junit::ReportUuid;
34use semver::Version;
35use std::{
36    collections::{BTreeMap, HashMap, HashSet},
37    fmt,
38    fs::{File, TryLockError},
39    io,
40    num::NonZero,
41    thread,
42    time::{Duration, Instant},
43};
44
45static RUNS_LOCK_FILE_NAME: &str = "runs.lock";
46static RUNS_JSON_FILE_NAME: &str = "runs.json.zst";
47
48/// A reference to the runs directory in a run store.
49///
50/// This provides methods to compute paths within the runs directory.
51#[derive(Clone, Copy, Debug)]
52pub struct StoreRunsDir<'a>(&'a Utf8Path);
53
54impl<'a> StoreRunsDir<'a> {
55    /// Creates a new `StoreRunsDir` from a path.
56    pub fn new(path: &'a Utf8Path) -> Self {
57        Self(path)
58    }
59
60    /// Returns the path to a specific run's directory.
61    pub fn run_dir(self, run_id: ReportUuid) -> Utf8PathBuf {
62        self.0.join(run_id.to_string())
63    }
64
65    /// Returns a [`RunFilesExist`] implementation for checking file existence
66    /// for a specific run in this store.
67    pub fn run_files(self, run_id: ReportUuid) -> StoreRunFiles {
68        StoreRunFiles {
69            run_dir: self.run_dir(run_id),
70        }
71    }
72
73    /// Returns the underlying path to the runs directory.
74    pub fn as_path(self) -> &'a Utf8Path {
75        self.0
76    }
77}
78
79/// Trait for checking whether required run files exist.
80///
81/// This abstracts over different storage backends.
82pub trait RunFilesExist {
83    /// Returns true if `store.zip` exists.
84    fn store_zip_exists(&self) -> bool;
85    /// Returns true if `run.log.zst` exists.
86    fn run_log_exists(&self) -> bool;
87}
88
89/// Checks file existence for a run stored on disk.
90///
91/// Created via [`StoreRunsDir::run_files`].
92pub struct StoreRunFiles {
93    run_dir: Utf8PathBuf,
94}
95
96impl RunFilesExist for StoreRunFiles {
97    fn store_zip_exists(&self) -> bool {
98        self.run_dir.join(STORE_ZIP_FILE_NAME).exists()
99    }
100
101    fn run_log_exists(&self) -> bool {
102        self.run_dir.join(RUN_LOG_FILE_NAME).exists()
103    }
104}
105
106/// Manages the storage of recorded test runs.
107///
108/// The run store is a directory containing a list of recorded runs and their data.
109/// Use [`RunStore::lock_exclusive`] to acquire exclusive access before creating
110/// new runs.
111#[derive(Debug)]
112pub struct RunStore {
113    runs_dir: Utf8PathBuf,
114}
115
116impl RunStore {
117    /// Creates a new `RunStore` at the given directory.
118    ///
119    /// Creates the directory if it doesn't exist.
120    pub fn new(store_dir: &Utf8Path) -> Result<Self, RunStoreError> {
121        let runs_dir = store_dir.join("runs");
122        std::fs::create_dir_all(&runs_dir).map_err(|error| RunStoreError::RunDirCreate {
123            run_dir: runs_dir.clone(),
124            error,
125        })?;
126
127        Ok(Self { runs_dir })
128    }
129
130    /// Returns the runs directory.
131    pub fn runs_dir(&self) -> StoreRunsDir<'_> {
132        StoreRunsDir(&self.runs_dir)
133    }
134
135    /// Acquires a shared lock on the run store for reading.
136    ///
137    /// Multiple readers can hold the shared lock simultaneously, but the shared
138    /// lock is exclusive with the exclusive lock (used for writing).
139    ///
140    /// Uses non-blocking lock attempts with retries to handle both brief
141    /// contention and filesystems where locking may not work (e.g., NFS).
142    pub fn lock_shared(&self) -> Result<SharedLockedRunStore<'_>, RunStoreError> {
143        let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
144        let file = std::fs::OpenOptions::new()
145            .create(true)
146            .truncate(false)
147            .write(true)
148            .open(&lock_file_path)
149            .map_err(|error| RunStoreError::FileLock {
150                path: lock_file_path.clone(),
151                error,
152            })?;
153
154        acquire_lock_with_retry(&file, &lock_file_path, LockKind::Shared)?;
155        let result = read_runs_json(&self.runs_dir)?;
156        let run_id_index = RunIdIndex::new(&result.runs);
157
158        Ok(SharedLockedRunStore {
159            runs_dir: StoreRunsDir(&self.runs_dir),
160            locked_file: DebugIgnore(file),
161            runs: result.runs,
162            write_permission: result.write_permission,
163            run_id_index,
164        })
165    }
166
167    /// Acquires an exclusive lock on the run store.
168    ///
169    /// This lock should only be held for a short duration (just long enough to
170    /// add a run to the list and create its directory).
171    ///
172    /// Uses non-blocking lock attempts with retries to handle both brief
173    /// contention and filesystems where locking may not work (e.g., NFS).
174    pub fn lock_exclusive(&self) -> Result<ExclusiveLockedRunStore<'_>, RunStoreError> {
175        let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
176        let file = std::fs::OpenOptions::new()
177            .create(true)
178            .truncate(false)
179            .write(true)
180            .open(&lock_file_path)
181            .map_err(|error| RunStoreError::FileLock {
182                path: lock_file_path.clone(),
183                error,
184            })?;
185
186        acquire_lock_with_retry(&file, &lock_file_path, LockKind::Exclusive)?;
187        let result = read_runs_json(&self.runs_dir)?;
188
189        Ok(ExclusiveLockedRunStore {
190            runs_dir: StoreRunsDir(&self.runs_dir),
191            locked_file: DebugIgnore(file),
192            runs: result.runs,
193            last_pruned_at: result.last_pruned_at,
194            write_permission: result.write_permission,
195        })
196    }
197}
198
199/// A run store that has been locked for exclusive access.
200///
201/// The lifetime parameter ensures this isn't held for longer than the
202/// corresponding [`RunStore`].
203#[derive(Debug)]
204pub struct ExclusiveLockedRunStore<'store> {
205    runs_dir: StoreRunsDir<'store>,
206    // Held for RAII lock semantics; the lock is released when this struct is dropped.
207    #[expect(dead_code)]
208    locked_file: DebugIgnore<File>,
209    runs: Vec<RecordedRunInfo>,
210    last_pruned_at: Option<DateTime<Utc>>,
211    write_permission: RunsJsonWritePermission,
212}
213
214impl<'store> ExclusiveLockedRunStore<'store> {
215    /// Returns the runs directory.
216    pub fn runs_dir(&self) -> StoreRunsDir<'store> {
217        self.runs_dir
218    }
219
220    /// Returns whether this nextest can write to the runs.json.zst file.
221    ///
222    /// If the file has a newer format version than we support, writing is denied.
223    pub fn write_permission(&self) -> RunsJsonWritePermission {
224        self.write_permission
225    }
226
227    /// Marks a run as completed and persists the change to disk.
228    ///
229    /// Updates sizes, `status`, and `duration_secs` to the given values.
230    /// Returns `true` if the run was found and updated, `false` if no run
231    /// with the given ID exists (in which case nothing is persisted).
232    ///
233    /// Returns an error if writing is denied due to a format version mismatch.
234    ///
235    /// The status should not be `Incomplete` since we're completing the run.
236    pub fn complete_run(
237        &mut self,
238        run_id: ReportUuid,
239        sizes: StoreSizes,
240        status: RecordedRunStatus,
241        duration_secs: Option<f64>,
242    ) -> Result<bool, RunStoreError> {
243        if let RunsJsonWritePermission::Denied {
244            file_version,
245            max_supported_version,
246        } = self.write_permission
247        {
248            return Err(RunStoreError::FormatVersionTooNew {
249                file_version,
250                max_supported_version,
251            });
252        }
253
254        let found = self.mark_run_completed_inner(run_id, sizes, status, duration_secs);
255        if found {
256            write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
257        }
258        Ok(found)
259    }
260
261    /// Updates a run's metadata in memory.
262    fn mark_run_completed_inner(
263        &mut self,
264        run_id: ReportUuid,
265        sizes: StoreSizes,
266        status: RecordedRunStatus,
267        duration_secs: Option<f64>,
268    ) -> bool {
269        for run in &mut self.runs {
270            if run.run_id == run_id {
271                run.sizes = RecordedSizes {
272                    log: ComponentSizes {
273                        compressed: sizes.log.compressed,
274                        uncompressed: sizes.log.uncompressed,
275                        entries: sizes.log.entries,
276                    },
277                    store: ComponentSizes {
278                        compressed: sizes.store.compressed,
279                        uncompressed: sizes.store.uncompressed,
280                        entries: sizes.store.entries,
281                    },
282                };
283                run.status = status;
284                run.duration_secs = duration_secs;
285                run.last_written_at = Local::now().fixed_offset();
286                return true;
287            }
288        }
289        false
290    }
291
292    /// Prunes runs according to the given retention policy.
293    ///
294    /// This method:
295    /// 1. Determines which runs to delete based on the policy
296    /// 2. Deletes those run directories from disk
297    /// 3. Deletes any orphaned directories not tracked in runs.json.zst
298    /// 4. Updates the run list in memory and on disk
299    ///
300    /// The `kind` parameter indicates whether this is explicit pruning (from a
301    /// user command) or implicit pruning (automatic during recording). This
302    /// affects how errors are displayed.
303    ///
304    /// Returns the result of the pruning operation, including any errors that
305    /// occurred while deleting individual runs.
306    ///
307    /// Returns an error if writing is denied due to a format version mismatch.
308    pub fn prune(
309        &mut self,
310        policy: &RecordRetentionPolicy,
311        kind: PruneKind,
312    ) -> Result<PruneResult, RunStoreError> {
313        if let RunsJsonWritePermission::Denied {
314            file_version,
315            max_supported_version,
316        } = self.write_permission
317        {
318            return Err(RunStoreError::FormatVersionTooNew {
319                file_version,
320                max_supported_version,
321            });
322        }
323
324        let now = Utc::now();
325        let to_delete: HashSet<_> = policy
326            .compute_runs_to_delete(&self.runs, now)
327            .into_iter()
328            .collect();
329
330        let runs_dir = self.runs_dir();
331        let mut result = if to_delete.is_empty() {
332            PruneResult::default()
333        } else {
334            delete_runs(runs_dir, &mut self.runs, &to_delete)
335        };
336        result.kind = kind;
337
338        let known_runs: HashSet<_> = self.runs.iter().map(|r| r.run_id).collect();
339        delete_orphaned_dirs(self.runs_dir, &known_runs, &mut result);
340
341        if result.deleted_count > 0 || result.orphans_deleted > 0 {
342            // Update last_pruned_at since we performed pruning.
343            self.last_pruned_at = Some(now);
344            write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
345        }
346
347        Ok(result)
348    }
349
350    /// Prunes runs if needed, based on time since last prune and limit thresholds.
351    ///
352    /// This method implements implicit pruning, which occurs:
353    /// - If more than 1 day has passed since the last prune, OR
354    /// - If any retention limit is exceeded by 1.5x.
355    ///
356    /// Use [`Self::prune`] for explicit pruning that always runs regardless of these conditions.
357    ///
358    /// Returns `Ok(None)` if pruning was skipped, `Ok(Some(result))` if pruning occurred.
359    pub fn prune_if_needed(
360        &mut self,
361        policy: &RecordRetentionPolicy,
362    ) -> Result<Option<PruneResult>, RunStoreError> {
363        const PRUNE_INTERVAL: TimeDelta = match TimeDelta::try_days(1) {
364            Some(d) => d,
365            None => panic!("1 day should always be a valid TimeDelta"),
366        };
367        const LIMIT_EXCEEDED_FACTOR: f64 = 1.5;
368
369        let now = Utc::now();
370
371        // Check if pruning is needed.
372        let time_since_last_prune = self
373            .last_pruned_at
374            .map(|last| now.signed_duration_since(last))
375            .unwrap_or(TimeDelta::MAX);
376
377        let should_prune = time_since_last_prune >= PRUNE_INTERVAL
378            || policy.limits_exceeded_by_factor(&self.runs, LIMIT_EXCEEDED_FACTOR);
379
380        if should_prune {
381            Ok(Some(self.prune(policy, PruneKind::Implicit)?))
382        } else {
383            Ok(None)
384        }
385    }
386
387    /// Creates a run recorder for a new run.
388    ///
389    /// Adds the run to the list and creates its directory. Consumes self,
390    /// dropping the exclusive lock.
391    ///
392    /// `max_output_size` specifies the maximum size of a single output (stdout/stderr)
393    /// before truncation.
394    ///
395    /// Returns the recorder and the shortest unique prefix for the run ID (for
396    /// display purposes), or an error if writing is denied due to a format
397    /// version mismatch.
398    #[expect(clippy::too_many_arguments)]
399    pub(crate) fn create_run_recorder(
400        mut self,
401        run_id: ReportUuid,
402        nextest_version: Version,
403        started_at: DateTime<FixedOffset>,
404        cli_args: Vec<String>,
405        build_scope_args: Vec<String>,
406        env_vars: BTreeMap<String, String>,
407        max_output_size: bytesize::ByteSize,
408        parent_run_id: Option<ReportUuid>,
409    ) -> Result<(RunRecorder, ShortestRunIdPrefix), RunStoreError> {
410        if let RunsJsonWritePermission::Denied {
411            file_version,
412            max_supported_version,
413        } = self.write_permission
414        {
415            return Err(RunStoreError::FormatVersionTooNew {
416                file_version,
417                max_supported_version,
418            });
419        }
420
421        // Add to the list of runs before creating the directory. This ensures
422        // that if creation fails, an empty run directory isn't left behind. (It
423        // does mean that there may be spurious entries in the list of runs,
424        // which will be dealt with during pruning.)
425
426        let now = Local::now().fixed_offset();
427        let run = RecordedRunInfo {
428            run_id,
429            store_format_version: store_format_version_for_new_run(),
430            nextest_version,
431            started_at,
432            last_written_at: now,
433            duration_secs: None,
434            cli_args,
435            build_scope_args,
436            env_vars,
437            parent_run_id,
438            sizes: RecordedSizes::default(),
439            status: RecordedRunStatus::Incomplete,
440        };
441        self.runs.push(run);
442
443        // If the parent run ID is set, update its last written at time.
444        if let Some(parent_run_id) = parent_run_id
445            && let Some(parent_run) = self.runs.iter_mut().find(|r| r.run_id == parent_run_id)
446        {
447            parent_run.last_written_at = now;
448        }
449
450        write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
451
452        // Compute the unique prefix now that the run is in the list.
453        let index = RunIdIndex::new(&self.runs);
454        let unique_prefix = index
455            .shortest_unique_prefix(run_id)
456            .expect("run was just added to the list");
457
458        // Create the run directory while still holding the lock. This prevents
459        // a race where another process could prune the newly-added run entry
460        // before the directory exists, leaving an orphaned directory. The lock
461        // is released when `self` is dropped.
462        let run_dir = self.runs_dir().run_dir(run_id);
463
464        let recorder = RunRecorder::new(run_dir, max_output_size)?;
465        Ok((recorder, unique_prefix))
466    }
467}
468
469/// Information about a recorded run.
470#[derive(Clone, Debug)]
471pub struct RecordedRunInfo {
472    /// The unique identifier for this run.
473    pub run_id: ReportUuid,
474    /// The format version of this run's store.zip archive.
475    ///
476    /// This allows checking replayability without opening the archive.
477    pub store_format_version: StoreFormatVersion,
478    /// The version of nextest that created this run.
479    pub nextest_version: Version,
480    /// When the run started.
481    pub started_at: DateTime<FixedOffset>,
482    /// When this run was last written to.
483    ///
484    /// Used for LRU eviction. Updated when the run is created, when the run
485    /// completes, and when a rerun references this run.
486    pub last_written_at: DateTime<FixedOffset>,
487    /// Duration of the run in seconds.
488    ///
489    /// This is `None` for incomplete runs.
490    pub duration_secs: Option<f64>,
491    /// The command-line arguments used to invoke nextest.
492    pub cli_args: Vec<String>,
493    /// Build scope arguments (package and target selection).
494    ///
495    /// These determine which packages and targets are built. In a rerun chain,
496    /// these are inherited from the original run unless explicitly overridden.
497    pub build_scope_args: Vec<String>,
498    /// Environment variables that affect nextest behavior (NEXTEST_* and CARGO_*).
499    pub env_vars: BTreeMap<String, String>,
500    /// If this is a rerun, the ID of the parent run.
501    ///
502    /// This forms a chain for iterative fix-and-rerun workflows.
503    pub parent_run_id: Option<ReportUuid>,
504    /// Sizes broken down by component (log and store).
505    pub sizes: RecordedSizes,
506    /// The status and statistics for this run.
507    pub status: RecordedRunStatus,
508}
509
510/// Sizes broken down by component (log and store).
511#[derive(Clone, Copy, Debug, Default)]
512pub struct RecordedSizes {
513    /// Sizes for the run log (run.log.zst).
514    pub log: ComponentSizes,
515    /// Sizes for the store archive (store.zip).
516    pub store: ComponentSizes,
517}
518
519/// Compressed and uncompressed sizes for a single component.
520#[derive(Clone, Copy, Debug, Default)]
521pub struct ComponentSizes {
522    /// Compressed size in bytes.
523    pub compressed: u64,
524    /// Uncompressed size in bytes.
525    pub uncompressed: u64,
526    /// Number of entries (records for log, files for store).
527    pub entries: u64,
528}
529
530impl RecordedSizes {
531    /// Returns the total compressed size (log + store).
532    pub fn total_compressed(&self) -> u64 {
533        self.log.compressed + self.store.compressed
534    }
535
536    /// Returns the total uncompressed size (log + store).
537    pub fn total_uncompressed(&self) -> u64 {
538        self.log.uncompressed + self.store.uncompressed
539    }
540
541    /// Returns the total number of entries (log records + store files).
542    pub fn total_entries(&self) -> u64 {
543        self.log.entries + self.store.entries
544    }
545}
546
547/// Status and statistics for a recorded run.
548#[derive(Clone, Debug)]
549pub enum RecordedRunStatus {
550    /// The run was interrupted before completion.
551    Incomplete,
552    /// A normal test run completed (all tests finished).
553    Completed(CompletedRunStats),
554    /// A normal test run was cancelled before all tests finished.
555    Cancelled(CompletedRunStats),
556    /// A stress test run completed (all iterations finished).
557    StressCompleted(StressCompletedRunStats),
558    /// A stress test run was cancelled before all iterations finished.
559    StressCancelled(StressCompletedRunStats),
560    /// An unknown status from a newer version of nextest.
561    ///
562    /// This variant is used for forward compatibility when reading runs.json.zst
563    /// files created by newer nextest versions that may have new status types.
564    Unknown,
565}
566
567impl RecordedRunStatus {
568    /// Returns a short status string for display.
569    pub fn short_status_str(&self) -> &'static str {
570        match self {
571            Self::Incomplete => "incomplete",
572            Self::Unknown => "unknown",
573            Self::Completed(_) => "completed",
574            Self::Cancelled(_) => "cancelled",
575            Self::StressCompleted(_) => "stress completed",
576            Self::StressCancelled(_) => "stress cancelled",
577        }
578    }
579
580    /// Returns the exit code for completed runs, or `None` for incomplete/unknown runs.
581    pub fn exit_code(&self) -> Option<i32> {
582        match self {
583            Self::Incomplete | Self::Unknown => None,
584            Self::Completed(stats) | Self::Cancelled(stats) => Some(stats.exit_code),
585            Self::StressCompleted(stats) | Self::StressCancelled(stats) => Some(stats.exit_code),
586        }
587    }
588}
589
590/// Statistics for a normal test run that finished (completed or cancelled).
591#[derive(Clone, Copy, Debug)]
592pub struct CompletedRunStats {
593    /// The number of tests that were expected to run.
594    pub initial_run_count: usize,
595    /// The number of tests that passed.
596    pub passed: usize,
597    /// The number of tests that failed (including exec failures and timeouts).
598    pub failed: usize,
599    /// The exit code from the run.
600    pub exit_code: i32,
601}
602
603/// Statistics for a stress test run that finished (completed or cancelled).
604#[derive(Clone, Copy, Debug)]
605pub struct StressCompletedRunStats {
606    /// The number of stress iterations that were expected to run, if known.
607    ///
608    /// This is `None` when the stress test was run without a fixed iteration count
609    /// (e.g., `--stress-duration`).
610    pub initial_iteration_count: Option<NonZero<u32>>,
611    /// The number of stress iterations that succeeded.
612    pub success_count: u32,
613    /// The number of stress iterations that failed.
614    pub failed_count: u32,
615    /// The exit code from the run.
616    pub exit_code: i32,
617}
618
619// ---
620// Replayability checking
621// ---
622
623/// The result of checking whether a run can be replayed.
624#[derive(Clone, Debug)]
625pub enum ReplayabilityStatus {
626    /// The run is definitely replayable.
627    ///
628    /// No blocking reasons and no uncertain conditions.
629    Replayable,
630    /// The run is definitely not replayable.
631    ///
632    /// Contains at least one blocking reason.
633    NotReplayable(Vec<NonReplayableReason>),
634    /// The run might be replayable but is incomplete.
635    ///
636    /// The archive might be usable, but we'd need to open `store.zip` to
637    /// verify all expected files are present.
638    Incomplete,
639}
640
641/// A definite reason why a run cannot be replayed.
642#[derive(Clone, Debug, PartialEq, Eq)]
643pub enum NonReplayableReason {
644    /// The run's store format version is incompatible with this nextest version.
645    ///
646    /// This nextest version cannot read the archive format.
647    StoreVersionIncompatible {
648        /// The specific incompatibility.
649        incompatibility: StoreVersionIncompatibility,
650    },
651    /// The `store.zip` file is missing from the run directory.
652    MissingStoreZip,
653    /// The `run.log.zst` file is missing from the run directory.
654    MissingRunLog,
655    /// The run status is `Unknown` (from a newer nextest version).
656    ///
657    /// We cannot safely replay since we don't understand the run's state.
658    StatusUnknown,
659}
660
661impl fmt::Display for NonReplayableReason {
662    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
663        match self {
664            Self::StoreVersionIncompatible { incompatibility } => {
665                write!(f, "store format version incompatible: {}", incompatibility)
666            }
667            Self::MissingStoreZip => {
668                write!(f, "store.zip is missing")
669            }
670            Self::MissingRunLog => {
671                write!(f, "run.log.zst is missing")
672            }
673            Self::StatusUnknown => {
674                write!(f, "run status is unknown (from a newer nextest version)")
675            }
676        }
677    }
678}
679
680/// Result of looking up a run by selector.
681#[derive(Clone, Copy, Debug)]
682pub struct ResolveRunIdResult {
683    /// The run ID.
684    pub run_id: ReportUuid,
685}
686
687impl RecordedRunStatus {
688    /// Returns the width (in decimal digits) needed to display the "passed" count.
689    ///
690    /// For non-completed runs (Incomplete, Unknown), returns 0 since they don't
691    /// display a passed count.
692    pub fn passed_count_width(&self) -> usize {
693        match self {
694            Self::Incomplete | Self::Unknown => 0,
695            Self::Completed(stats) | Self::Cancelled(stats) => decimal_char_width(stats.passed),
696            Self::StressCompleted(stats) | Self::StressCancelled(stats) => {
697                decimal_char_width(stats.success_count)
698            }
699        }
700    }
701}
702
703impl RecordedRunInfo {
704    /// Checks whether this run can be replayed.
705    ///
706    /// This performs a comprehensive check of all conditions that might prevent
707    /// replay, including:
708    /// - Store format version compatibility
709    /// - Presence of required files (store.zip, run.log.zst)
710    /// - Run status (unknown, incomplete)
711    ///
712    /// The `files` parameter is used to check for file existence. Use
713    /// [`StoreRunsDir::run_files`] for runs in the store, or pass in a
714    /// [`PortableRecording`](super::PortableRecording) directly.
715    pub fn check_replayability(&self, files: &dyn RunFilesExist) -> ReplayabilityStatus {
716        let mut blocking = Vec::new();
717        let mut is_incomplete = false;
718
719        // Check store format version compatibility.
720        if let Err(incompatibility) = self
721            .store_format_version
722            .check_readable_by(STORE_FORMAT_VERSION)
723        {
724            blocking.push(NonReplayableReason::StoreVersionIncompatible { incompatibility });
725        }
726
727        // Check for required files.
728        if !files.store_zip_exists() {
729            blocking.push(NonReplayableReason::MissingStoreZip);
730        }
731        if !files.run_log_exists() {
732            blocking.push(NonReplayableReason::MissingRunLog);
733        }
734
735        // Check run status.
736        match &self.status {
737            RecordedRunStatus::Unknown => {
738                blocking.push(NonReplayableReason::StatusUnknown);
739            }
740            RecordedRunStatus::Incomplete => {
741                is_incomplete = true;
742            }
743            RecordedRunStatus::Completed(_)
744            | RecordedRunStatus::Cancelled(_)
745            | RecordedRunStatus::StressCompleted(_)
746            | RecordedRunStatus::StressCancelled(_) => {
747                // These statuses are fine for replay.
748            }
749        }
750
751        // Return the appropriate variant based on what we found.
752        if !blocking.is_empty() {
753            ReplayabilityStatus::NotReplayable(blocking)
754        } else if is_incomplete {
755            ReplayabilityStatus::Incomplete
756        } else {
757            ReplayabilityStatus::Replayable
758        }
759    }
760
761    /// Returns a display wrapper for this run.
762    ///
763    /// The `run_id_index` is used for computing shortest unique prefixes,
764    /// which are highlighted differently in the output (similar to jj).
765    ///
766    /// The `alignment` parameter controls column alignment when displaying a
767    /// list of runs. Use [`RunListAlignment::from_runs`] to precompute
768    /// alignment for a set of runs.
769    ///
770    /// The `redactor` parameter, if provided, redacts timestamps, durations,
771    /// and sizes for snapshot testing while preserving column alignment.
772    pub fn display<'a>(
773        &'a self,
774        run_id_index: &'a RunIdIndex,
775        replayability: &'a ReplayabilityStatus,
776        alignment: RunListAlignment,
777        styles: &'a Styles,
778        redactor: &'a Redactor,
779    ) -> DisplayRecordedRunInfo<'a> {
780        DisplayRecordedRunInfo::new(
781            self,
782            run_id_index,
783            replayability,
784            alignment,
785            styles,
786            redactor,
787        )
788    }
789
790    /// Returns a detailed display wrapper for this run.
791    ///
792    /// Unlike [`Self::display`] which shows a compact table row, this provides
793    /// a multi-line detailed view suitable for the `store info` command.
794    ///
795    /// The `replayability` parameter should be computed by the caller using
796    /// [`Self::check_replayability`].
797    ///
798    /// The `now` parameter is the current time, used to compute relative
799    /// durations (e.g. "30s ago").
800    ///
801    /// The `redactor` parameter redacts paths, timestamps, durations, and sizes
802    /// for snapshot testing. Use `Redactor::noop()` if no redaction is needed.
803    pub fn display_detailed<'a>(
804        &'a self,
805        run_id_index: &'a RunIdIndex,
806        replayability: &'a ReplayabilityStatus,
807        now: DateTime<Utc>,
808        styles: &'a Styles,
809        theme_characters: &'a ThemeCharacters,
810        redactor: &'a Redactor,
811    ) -> DisplayRecordedRunInfoDetailed<'a> {
812        DisplayRecordedRunInfoDetailed::new(
813            self,
814            run_id_index,
815            replayability,
816            now,
817            styles,
818            theme_characters,
819            redactor,
820        )
821    }
822}
823
824/// Result of reading runs.json.zst.
825struct ReadRunsJsonResult {
826    runs: Vec<RecordedRunInfo>,
827    last_pruned_at: Option<DateTime<Utc>>,
828    write_permission: RunsJsonWritePermission,
829}
830
831/// Reads and deserializes `runs.json.zst`, converting to the internal
832/// representation.
833fn read_runs_json(runs_dir: &Utf8Path) -> Result<ReadRunsJsonResult, RunStoreError> {
834    let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
835    let file = match File::open(&runs_json_path) {
836        Ok(file) => file,
837        Err(error) => {
838            if error.kind() == io::ErrorKind::NotFound {
839                // The file doesn't exist yet, so we can write a new one.
840                return Ok(ReadRunsJsonResult {
841                    runs: Vec::new(),
842                    last_pruned_at: None,
843                    write_permission: RunsJsonWritePermission::Allowed,
844                });
845            } else {
846                return Err(RunStoreError::RunListRead {
847                    path: runs_json_path,
848                    error,
849                });
850            }
851        }
852    };
853
854    let decoder = zstd::stream::Decoder::new(file).map_err(|error| RunStoreError::RunListRead {
855        path: runs_json_path.clone(),
856        error,
857    })?;
858
859    let list: RecordedRunList =
860        serde_json::from_reader(decoder).map_err(|error| RunStoreError::RunListDeserialize {
861            path: runs_json_path,
862            error,
863        })?;
864    let write_permission = list.write_permission();
865    let data = list.into_data();
866    Ok(ReadRunsJsonResult {
867        runs: data.runs,
868        last_pruned_at: data.last_pruned_at,
869        write_permission,
870    })
871}
872
873/// Serializes and writes runs.json.zst from internal representation.
874fn write_runs_json(
875    runs_dir: &Utf8Path,
876    runs: &[RecordedRunInfo],
877    last_pruned_at: Option<DateTime<Utc>>,
878) -> Result<(), RunStoreError> {
879    let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
880    let list = RecordedRunList::from_data(runs, last_pruned_at);
881
882    atomicwrites::AtomicFile::new(&runs_json_path, atomicwrites::AllowOverwrite)
883        .write(|file| {
884            // Use compression level 3, consistent with other zstd usage in the crate.
885            let mut encoder = zstd::stream::Encoder::new(file, 3)?;
886            serde_json::to_writer_pretty(&mut encoder, &list)?;
887            encoder.finish()?;
888            Ok(())
889        })
890        .map_err(|error| RunStoreError::RunListWrite {
891            path: runs_json_path,
892            error,
893        })?;
894
895    Ok(())
896}
897
898/// A run store that has been locked for shared (read-only) access.
899///
900/// Multiple readers can hold this lock simultaneously, but it is exclusive
901/// with the exclusive lock used for writing.
902#[derive(Debug)]
903pub struct SharedLockedRunStore<'store> {
904    runs_dir: StoreRunsDir<'store>,
905    #[expect(dead_code, reason = "held for lock duration")]
906    locked_file: DebugIgnore<File>,
907    runs: Vec<RecordedRunInfo>,
908    write_permission: RunsJsonWritePermission,
909    run_id_index: RunIdIndex,
910}
911
912impl<'store> SharedLockedRunStore<'store> {
913    /// Returns a snapshot of the runs data, consuming self and releasing the
914    /// lock.
915    pub fn into_snapshot(self) -> RunStoreSnapshot {
916        RunStoreSnapshot {
917            runs_dir: self.runs_dir.as_path().to_owned(),
918            runs: self.runs,
919            write_permission: self.write_permission,
920            run_id_index: self.run_id_index,
921        }
922    }
923}
924
925/// A snapshot of run store data.
926#[derive(Debug)]
927pub struct RunStoreSnapshot {
928    runs_dir: Utf8PathBuf,
929    runs: Vec<RecordedRunInfo>,
930    write_permission: RunsJsonWritePermission,
931    run_id_index: RunIdIndex,
932}
933
934impl RunStoreSnapshot {
935    /// Returns the runs directory.
936    pub fn runs_dir(&self) -> StoreRunsDir<'_> {
937        StoreRunsDir(&self.runs_dir)
938    }
939
940    /// Returns whether this nextest can write to the runs.json.zst file.
941    ///
942    /// If the file has a newer format version than we support, writing is denied.
943    pub fn write_permission(&self) -> RunsJsonWritePermission {
944        self.write_permission
945    }
946
947    /// Returns a list of recorded runs.
948    pub fn runs(&self) -> &[RecordedRunInfo] {
949        &self.runs
950    }
951
952    /// Returns the number of recorded runs.
953    pub fn run_count(&self) -> usize {
954        self.runs.len()
955    }
956
957    /// Returns the total compressed size of all recorded runs in bytes.
958    pub fn total_size(&self) -> u64 {
959        self.runs.iter().map(|r| r.sizes.total_compressed()).sum()
960    }
961
962    /// Resolves a run ID selector to a run result.
963    ///
964    /// For [`RunIdSelector::Latest`], returns the most recent run by start
965    /// time.
966    /// For [`RunIdSelector::Prefix`], resolves the prefix to a specific run.
967    pub fn resolve_run_id(
968        &self,
969        selector: &RunIdSelector,
970    ) -> Result<ResolveRunIdResult, RunIdResolutionError> {
971        match selector {
972            RunIdSelector::Latest => self.most_recent_run(),
973            RunIdSelector::Prefix(prefix) => {
974                let run_id = self.resolve_run_id_prefix(prefix)?;
975                Ok(ResolveRunIdResult { run_id })
976            }
977        }
978    }
979
980    /// Resolves a run ID prefix to a full UUID.
981    ///
982    /// The prefix must be a valid hexadecimal string. If the prefix matches
983    /// exactly one run, that run's UUID is returned. Otherwise, an error is
984    /// returned indicating whether no runs matched or multiple runs matched.
985    fn resolve_run_id_prefix(&self, prefix: &str) -> Result<ReportUuid, RunIdResolutionError> {
986        self.run_id_index.resolve_prefix(prefix).map_err(|err| {
987            match err {
988                PrefixResolutionError::NotFound => RunIdResolutionError::NotFound {
989                    prefix: prefix.to_string(),
990                },
991                PrefixResolutionError::Ambiguous { count, candidates } => {
992                    // Convert UUIDs to full RecordedRunInfo and sort by start time (most recent first).
993                    let mut candidates: Vec<_> = candidates
994                        .into_iter()
995                        .filter_map(|run_id| self.get_run(run_id).cloned())
996                        .collect();
997                    candidates.sort_by_key(|run| std::cmp::Reverse(run.started_at));
998                    RunIdResolutionError::Ambiguous {
999                        prefix: prefix.to_string(),
1000                        count,
1001                        candidates,
1002                        run_id_index: self.run_id_index.clone(),
1003                    }
1004                }
1005                PrefixResolutionError::InvalidPrefix => RunIdResolutionError::InvalidPrefix {
1006                    prefix: prefix.to_string(),
1007                },
1008            }
1009        })
1010    }
1011
1012    /// Returns the run ID index for computing shortest unique prefixes.
1013    pub fn run_id_index(&self) -> &RunIdIndex {
1014        &self.run_id_index
1015    }
1016
1017    /// Looks up a run by its exact UUID.
1018    pub fn get_run(&self, run_id: ReportUuid) -> Option<&RecordedRunInfo> {
1019        self.runs.iter().find(|r| r.run_id == run_id)
1020    }
1021
1022    /// Returns the most recent run by start time.
1023    ///
1024    /// Returns an error if there are no runs at all.
1025    pub fn most_recent_run(&self) -> Result<ResolveRunIdResult, RunIdResolutionError> {
1026        self.runs
1027            .iter()
1028            .max_by_key(|r| r.started_at)
1029            .map(|r| ResolveRunIdResult { run_id: r.run_id })
1030            .ok_or(RunIdResolutionError::NoRuns)
1031    }
1032
1033    /// Computes which runs would be deleted by a prune operation.
1034    ///
1035    /// This is used for dry-run mode to show what would be deleted without
1036    /// actually deleting anything. Returns a [`PrunePlan`] containing the runs
1037    /// that would be deleted, sorted by start time (oldest first).
1038    pub fn compute_prune_plan(&self, policy: &RecordRetentionPolicy) -> PrunePlan {
1039        PrunePlan::compute(&self.runs, policy)
1040    }
1041}
1042
1043/// A snapshot paired with precomputed replayability status for all runs.
1044///
1045/// This struct maintains the invariant that every run in the snapshot has a
1046/// corresponding entry in the replayability map. Use [`Self::new`] to compute
1047/// replayability for all runs, or `Self::new_for_test` for testing.
1048#[derive(Debug)]
1049pub struct SnapshotWithReplayability<'a> {
1050    snapshot: &'a RunStoreSnapshot,
1051    replayability: HashMap<ReportUuid, ReplayabilityStatus>,
1052    latest_run_id: Option<ReportUuid>,
1053}
1054
1055impl<'a> SnapshotWithReplayability<'a> {
1056    /// Creates a new snapshot with replayability by checking all runs.
1057    ///
1058    /// This computes [`ReplayabilityStatus`] for each run by checking file
1059    /// existence and format versions.
1060    pub fn new(snapshot: &'a RunStoreSnapshot) -> Self {
1061        let runs_dir = snapshot.runs_dir();
1062        let replayability: HashMap<_, _> = snapshot
1063            .runs()
1064            .iter()
1065            .map(|run| {
1066                (
1067                    run.run_id,
1068                    run.check_replayability(&runs_dir.run_files(run.run_id)),
1069                )
1070            })
1071            .collect();
1072
1073        // Find the latest run by time.
1074        let latest_run_id = snapshot.most_recent_run().ok().map(|r| r.run_id);
1075
1076        Self {
1077            snapshot,
1078            replayability,
1079            latest_run_id,
1080        }
1081    }
1082
1083    /// Returns a reference to the underlying snapshot.
1084    pub fn snapshot(&self) -> &'a RunStoreSnapshot {
1085        self.snapshot
1086    }
1087
1088    /// Returns the replayability map.
1089    pub fn replayability(&self) -> &HashMap<ReportUuid, ReplayabilityStatus> {
1090        &self.replayability
1091    }
1092
1093    /// Returns the replayability status for a specific run.
1094    ///
1095    /// # Panics
1096    ///
1097    /// Panics if the run ID is not in the snapshot. This maintains the
1098    /// invariant that all runs in the snapshot have replayability computed.
1099    pub fn get_replayability(&self, run_id: ReportUuid) -> &ReplayabilityStatus {
1100        self.replayability
1101            .get(&run_id)
1102            .expect("run ID should be in replayability map")
1103    }
1104
1105    /// Returns the ID of the most recent run by start time, if any.
1106    pub fn latest_run_id(&self) -> Option<ReportUuid> {
1107        self.latest_run_id
1108    }
1109}
1110
1111#[cfg(test)]
1112impl SnapshotWithReplayability<'_> {
1113    /// Creates a snapshot with replayability for testing.
1114    ///
1115    /// All runs are marked as [`ReplayabilityStatus::Replayable`] by default.
1116    pub fn new_for_test(snapshot: &RunStoreSnapshot) -> SnapshotWithReplayability<'_> {
1117        let replayability: HashMap<_, _> = snapshot
1118            .runs()
1119            .iter()
1120            .map(|run| (run.run_id, ReplayabilityStatus::Replayable))
1121            .collect();
1122
1123        // For tests, latest is just the most recent by time.
1124        let latest_run_id = snapshot
1125            .runs()
1126            .iter()
1127            .max_by_key(|r| r.started_at)
1128            .map(|r| r.run_id);
1129
1130        SnapshotWithReplayability {
1131            snapshot,
1132            replayability,
1133            latest_run_id,
1134        }
1135    }
1136}
1137
1138#[cfg(test)]
1139impl RunStoreSnapshot {
1140    /// Creates a new snapshot for testing.
1141    pub(crate) fn new_for_test(runs: Vec<RecordedRunInfo>) -> Self {
1142        use super::run_id_index::RunIdIndex;
1143
1144        let run_id_index = RunIdIndex::new(&runs);
1145        Self {
1146            runs_dir: Utf8PathBuf::from("/test/runs"),
1147            runs,
1148            write_permission: RunsJsonWritePermission::Allowed,
1149            run_id_index,
1150        }
1151    }
1152}
1153
1154/// The kind of lock to acquire.
1155#[derive(Clone, Copy)]
1156enum LockKind {
1157    Shared,
1158    Exclusive,
1159}
1160
1161/// Acquires a file lock with retries, timing out after 5 seconds.
1162///
1163/// This handles both brief contention (another nextest process finishing up)
1164/// and filesystems where locking may not work properly (e.g., NFS).
1165fn acquire_lock_with_retry(
1166    file: &File,
1167    lock_file_path: &Utf8Path,
1168    kind: LockKind,
1169) -> Result<(), RunStoreError> {
1170    const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
1171    const LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(100);
1172
1173    let start = Instant::now();
1174    loop {
1175        let result = match kind {
1176            LockKind::Shared => file.try_lock_shared(),
1177            LockKind::Exclusive => file.try_lock(),
1178        };
1179
1180        match result {
1181            Ok(()) => return Ok(()),
1182            Err(TryLockError::WouldBlock) => {
1183                // Lock is held by another process. Retry if we haven't timed out.
1184                if start.elapsed() >= LOCK_TIMEOUT {
1185                    return Err(RunStoreError::FileLockTimeout {
1186                        path: lock_file_path.to_owned(),
1187                        timeout_secs: LOCK_TIMEOUT.as_secs(),
1188                    });
1189                }
1190                thread::sleep(LOCK_RETRY_INTERVAL);
1191            }
1192            Err(TryLockError::Error(error)) => {
1193                // Some other error (e.g., locking not supported on this filesystem).
1194                return Err(RunStoreError::FileLock {
1195                    path: lock_file_path.to_owned(),
1196                    error,
1197                });
1198            }
1199        }
1200    }
1201}