1use super::{
13 display::{DisplayRecordedRunInfo, DisplayRecordedRunInfoDetailed, RunListAlignment, Styles},
14 format::{
15 RUN_LOG_FILE_NAME, RecordedRunList, RunsJsonWritePermission, STORE_FORMAT_VERSION,
16 STORE_ZIP_FILE_NAME, StoreFormatVersion, StoreVersionIncompatibility,
17 },
18 recorder::{RunRecorder, StoreSizes},
19 retention::{
20 PruneKind, PrunePlan, PruneResult, RecordRetentionPolicy, delete_orphaned_dirs, delete_runs,
21 },
22 run_id_index::{PrefixResolutionError, RunIdIndex, RunIdSelector, ShortestRunIdPrefix},
23};
24use crate::{
25 errors::{RunIdResolutionError, RunStoreError},
26 helpers::{ThemeCharacters, decimal_char_width},
27 redact::Redactor,
28};
29use camino::{Utf8Path, Utf8PathBuf};
30use chrono::{DateTime, FixedOffset, Local, TimeDelta, Utc};
31use debug_ignore::DebugIgnore;
32use quick_junit::ReportUuid;
33use semver::Version;
34use std::{
35 collections::{BTreeMap, HashMap, HashSet},
36 fmt,
37 fs::{File, TryLockError},
38 io,
39 num::NonZero,
40 thread,
41 time::{Duration, Instant},
42};
43
44static RUNS_LOCK_FILE_NAME: &str = "runs.lock";
45static RUNS_JSON_FILE_NAME: &str = "runs.json.zst";
46
47#[derive(Clone, Copy, Debug)]
51pub struct StoreRunsDir<'a>(&'a Utf8Path);
52
53impl<'a> StoreRunsDir<'a> {
54 pub fn new(path: &'a Utf8Path) -> Self {
56 Self(path)
57 }
58
59 pub fn run_dir(self, run_id: ReportUuid) -> Utf8PathBuf {
61 self.0.join(run_id.to_string())
62 }
63
64 pub fn run_files(self, run_id: ReportUuid) -> StoreRunFiles {
67 StoreRunFiles {
68 run_dir: self.run_dir(run_id),
69 }
70 }
71
72 pub fn as_path(self) -> &'a Utf8Path {
74 self.0
75 }
76}
77
78pub trait RunFilesExist {
82 fn store_zip_exists(&self) -> bool;
84 fn run_log_exists(&self) -> bool;
86}
87
88pub struct StoreRunFiles {
92 run_dir: Utf8PathBuf,
93}
94
95impl RunFilesExist for StoreRunFiles {
96 fn store_zip_exists(&self) -> bool {
97 self.run_dir.join(STORE_ZIP_FILE_NAME).exists()
98 }
99
100 fn run_log_exists(&self) -> bool {
101 self.run_dir.join(RUN_LOG_FILE_NAME).exists()
102 }
103}
104
105#[derive(Debug)]
111pub struct RunStore {
112 runs_dir: Utf8PathBuf,
113}
114
115impl RunStore {
116 pub fn new(store_dir: &Utf8Path) -> Result<Self, RunStoreError> {
120 let runs_dir = store_dir.join("runs");
121 std::fs::create_dir_all(&runs_dir).map_err(|error| RunStoreError::RunDirCreate {
122 run_dir: runs_dir.clone(),
123 error,
124 })?;
125
126 Ok(Self { runs_dir })
127 }
128
129 pub fn runs_dir(&self) -> StoreRunsDir<'_> {
131 StoreRunsDir(&self.runs_dir)
132 }
133
134 pub fn lock_shared(&self) -> Result<SharedLockedRunStore<'_>, RunStoreError> {
142 let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
143 let file = std::fs::OpenOptions::new()
144 .create(true)
145 .truncate(false)
146 .write(true)
147 .open(&lock_file_path)
148 .map_err(|error| RunStoreError::FileLock {
149 path: lock_file_path.clone(),
150 error,
151 })?;
152
153 acquire_lock_with_retry(&file, &lock_file_path, LockKind::Shared)?;
154 let result = read_runs_json(&self.runs_dir)?;
155 let run_id_index = RunIdIndex::new(&result.runs);
156
157 Ok(SharedLockedRunStore {
158 runs_dir: StoreRunsDir(&self.runs_dir),
159 locked_file: DebugIgnore(file),
160 runs: result.runs,
161 write_permission: result.write_permission,
162 run_id_index,
163 })
164 }
165
166 pub fn lock_exclusive(&self) -> Result<ExclusiveLockedRunStore<'_>, RunStoreError> {
174 let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
175 let file = std::fs::OpenOptions::new()
176 .create(true)
177 .truncate(false)
178 .write(true)
179 .open(&lock_file_path)
180 .map_err(|error| RunStoreError::FileLock {
181 path: lock_file_path.clone(),
182 error,
183 })?;
184
185 acquire_lock_with_retry(&file, &lock_file_path, LockKind::Exclusive)?;
186 let result = read_runs_json(&self.runs_dir)?;
187
188 Ok(ExclusiveLockedRunStore {
189 runs_dir: StoreRunsDir(&self.runs_dir),
190 locked_file: DebugIgnore(file),
191 runs: result.runs,
192 last_pruned_at: result.last_pruned_at,
193 write_permission: result.write_permission,
194 })
195 }
196}
197
198#[derive(Debug)]
203pub struct ExclusiveLockedRunStore<'store> {
204 runs_dir: StoreRunsDir<'store>,
205 #[expect(dead_code)]
207 locked_file: DebugIgnore<File>,
208 runs: Vec<RecordedRunInfo>,
209 last_pruned_at: Option<DateTime<Utc>>,
210 write_permission: RunsJsonWritePermission,
211}
212
213impl<'store> ExclusiveLockedRunStore<'store> {
214 pub fn runs_dir(&self) -> StoreRunsDir<'store> {
216 self.runs_dir
217 }
218
219 pub fn write_permission(&self) -> RunsJsonWritePermission {
223 self.write_permission
224 }
225
226 pub fn complete_run(
236 &mut self,
237 run_id: ReportUuid,
238 sizes: StoreSizes,
239 status: RecordedRunStatus,
240 duration_secs: Option<f64>,
241 ) -> Result<bool, RunStoreError> {
242 if let RunsJsonWritePermission::Denied {
243 file_version,
244 max_supported_version,
245 } = self.write_permission
246 {
247 return Err(RunStoreError::FormatVersionTooNew {
248 file_version,
249 max_supported_version,
250 });
251 }
252
253 let found = self.mark_run_completed_inner(run_id, sizes, status, duration_secs);
254 if found {
255 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
256 }
257 Ok(found)
258 }
259
260 fn mark_run_completed_inner(
262 &mut self,
263 run_id: ReportUuid,
264 sizes: StoreSizes,
265 status: RecordedRunStatus,
266 duration_secs: Option<f64>,
267 ) -> bool {
268 for run in &mut self.runs {
269 if run.run_id == run_id {
270 run.sizes = RecordedSizes {
271 log: ComponentSizes {
272 compressed: sizes.log.compressed,
273 uncompressed: sizes.log.uncompressed,
274 entries: sizes.log.entries,
275 },
276 store: ComponentSizes {
277 compressed: sizes.store.compressed,
278 uncompressed: sizes.store.uncompressed,
279 entries: sizes.store.entries,
280 },
281 };
282 run.status = status;
283 run.duration_secs = duration_secs;
284 run.last_written_at = Local::now().fixed_offset();
285 return true;
286 }
287 }
288 false
289 }
290
291 pub fn prune(
308 &mut self,
309 policy: &RecordRetentionPolicy,
310 kind: PruneKind,
311 ) -> Result<PruneResult, RunStoreError> {
312 if let RunsJsonWritePermission::Denied {
313 file_version,
314 max_supported_version,
315 } = self.write_permission
316 {
317 return Err(RunStoreError::FormatVersionTooNew {
318 file_version,
319 max_supported_version,
320 });
321 }
322
323 let now = Utc::now();
324 let to_delete: HashSet<_> = policy
325 .compute_runs_to_delete(&self.runs, now)
326 .into_iter()
327 .collect();
328
329 let runs_dir = self.runs_dir();
330 let mut result = if to_delete.is_empty() {
331 PruneResult::default()
332 } else {
333 delete_runs(runs_dir, &mut self.runs, &to_delete)
334 };
335 result.kind = kind;
336
337 let known_runs: HashSet<_> = self.runs.iter().map(|r| r.run_id).collect();
338 delete_orphaned_dirs(self.runs_dir, &known_runs, &mut result);
339
340 if result.deleted_count > 0 || result.orphans_deleted > 0 {
341 self.last_pruned_at = Some(now);
343 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
344 }
345
346 Ok(result)
347 }
348
349 pub fn prune_if_needed(
359 &mut self,
360 policy: &RecordRetentionPolicy,
361 ) -> Result<Option<PruneResult>, RunStoreError> {
362 const PRUNE_INTERVAL: TimeDelta = match TimeDelta::try_days(1) {
363 Some(d) => d,
364 None => panic!("1 day should always be a valid TimeDelta"),
365 };
366 const LIMIT_EXCEEDED_FACTOR: f64 = 1.5;
367
368 let now = Utc::now();
369
370 let time_since_last_prune = self
372 .last_pruned_at
373 .map(|last| now.signed_duration_since(last))
374 .unwrap_or(TimeDelta::MAX);
375
376 let should_prune = time_since_last_prune >= PRUNE_INTERVAL
377 || policy.limits_exceeded_by_factor(&self.runs, LIMIT_EXCEEDED_FACTOR);
378
379 if should_prune {
380 Ok(Some(self.prune(policy, PruneKind::Implicit)?))
381 } else {
382 Ok(None)
383 }
384 }
385
386 #[expect(clippy::too_many_arguments)]
398 pub(crate) fn create_run_recorder(
399 mut self,
400 run_id: ReportUuid,
401 nextest_version: Version,
402 started_at: DateTime<FixedOffset>,
403 cli_args: Vec<String>,
404 build_scope_args: Vec<String>,
405 env_vars: BTreeMap<String, String>,
406 max_output_size: bytesize::ByteSize,
407 parent_run_id: Option<ReportUuid>,
408 ) -> Result<(RunRecorder, ShortestRunIdPrefix), RunStoreError> {
409 if let RunsJsonWritePermission::Denied {
410 file_version,
411 max_supported_version,
412 } = self.write_permission
413 {
414 return Err(RunStoreError::FormatVersionTooNew {
415 file_version,
416 max_supported_version,
417 });
418 }
419
420 let now = Local::now().fixed_offset();
426 let run = RecordedRunInfo {
427 run_id,
428 store_format_version: STORE_FORMAT_VERSION,
429 nextest_version,
430 started_at,
431 last_written_at: now,
432 duration_secs: None,
433 cli_args,
434 build_scope_args,
435 env_vars,
436 parent_run_id,
437 sizes: RecordedSizes::default(),
438 status: RecordedRunStatus::Incomplete,
439 };
440 self.runs.push(run);
441
442 if let Some(parent_run_id) = parent_run_id
444 && let Some(parent_run) = self.runs.iter_mut().find(|r| r.run_id == parent_run_id)
445 {
446 parent_run.last_written_at = now;
447 }
448
449 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
450
451 let index = RunIdIndex::new(&self.runs);
453 let unique_prefix = index
454 .shortest_unique_prefix(run_id)
455 .expect("run was just added to the list");
456
457 let run_dir = self.runs_dir().run_dir(run_id);
462
463 let recorder = RunRecorder::new(run_dir, max_output_size)?;
464 Ok((recorder, unique_prefix))
465 }
466}
467
468#[derive(Clone, Debug)]
470pub struct RecordedRunInfo {
471 pub run_id: ReportUuid,
473 pub store_format_version: StoreFormatVersion,
477 pub nextest_version: Version,
479 pub started_at: DateTime<FixedOffset>,
481 pub last_written_at: DateTime<FixedOffset>,
486 pub duration_secs: Option<f64>,
490 pub cli_args: Vec<String>,
492 pub build_scope_args: Vec<String>,
497 pub env_vars: BTreeMap<String, String>,
499 pub parent_run_id: Option<ReportUuid>,
503 pub sizes: RecordedSizes,
505 pub status: RecordedRunStatus,
507}
508
509#[derive(Clone, Copy, Debug, Default)]
511pub struct RecordedSizes {
512 pub log: ComponentSizes,
514 pub store: ComponentSizes,
516}
517
518#[derive(Clone, Copy, Debug, Default)]
520pub struct ComponentSizes {
521 pub compressed: u64,
523 pub uncompressed: u64,
525 pub entries: u64,
527}
528
529impl RecordedSizes {
530 pub fn total_compressed(&self) -> u64 {
532 self.log.compressed + self.store.compressed
533 }
534
535 pub fn total_uncompressed(&self) -> u64 {
537 self.log.uncompressed + self.store.uncompressed
538 }
539
540 pub fn total_entries(&self) -> u64 {
542 self.log.entries + self.store.entries
543 }
544}
545
546#[derive(Clone, Debug)]
548pub enum RecordedRunStatus {
549 Incomplete,
551 Completed(CompletedRunStats),
553 Cancelled(CompletedRunStats),
555 StressCompleted(StressCompletedRunStats),
557 StressCancelled(StressCompletedRunStats),
559 Unknown,
564}
565
566impl RecordedRunStatus {
567 pub fn short_status_str(&self) -> &'static str {
569 match self {
570 Self::Incomplete => "incomplete",
571 Self::Unknown => "unknown",
572 Self::Completed(_) => "completed",
573 Self::Cancelled(_) => "cancelled",
574 Self::StressCompleted(_) => "stress completed",
575 Self::StressCancelled(_) => "stress cancelled",
576 }
577 }
578
579 pub fn exit_code(&self) -> Option<i32> {
581 match self {
582 Self::Incomplete | Self::Unknown => None,
583 Self::Completed(stats) | Self::Cancelled(stats) => Some(stats.exit_code),
584 Self::StressCompleted(stats) | Self::StressCancelled(stats) => Some(stats.exit_code),
585 }
586 }
587}
588
589#[derive(Clone, Copy, Debug)]
591pub struct CompletedRunStats {
592 pub initial_run_count: usize,
594 pub passed: usize,
596 pub failed: usize,
598 pub exit_code: i32,
600}
601
602#[derive(Clone, Copy, Debug)]
604pub struct StressCompletedRunStats {
605 pub initial_iteration_count: Option<NonZero<u32>>,
610 pub success_count: u32,
612 pub failed_count: u32,
614 pub exit_code: i32,
616}
617
618#[derive(Clone, Debug)]
624pub enum ReplayabilityStatus {
625 Replayable,
629 NotReplayable(Vec<NonReplayableReason>),
633 Incomplete,
638}
639
640#[derive(Clone, Debug, PartialEq, Eq)]
642pub enum NonReplayableReason {
643 StoreVersionIncompatible {
647 incompatibility: StoreVersionIncompatibility,
649 },
650 MissingStoreZip,
652 MissingRunLog,
654 StatusUnknown,
658}
659
660impl fmt::Display for NonReplayableReason {
661 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
662 match self {
663 Self::StoreVersionIncompatible { incompatibility } => {
664 write!(f, "store format version incompatible: {}", incompatibility)
665 }
666 Self::MissingStoreZip => {
667 write!(f, "store.zip is missing")
668 }
669 Self::MissingRunLog => {
670 write!(f, "run.log.zst is missing")
671 }
672 Self::StatusUnknown => {
673 write!(f, "run status is unknown (from a newer nextest version)")
674 }
675 }
676 }
677}
678
679#[derive(Clone, Copy, Debug)]
681pub struct ResolveRunIdResult {
682 pub run_id: ReportUuid,
684}
685
686impl RecordedRunStatus {
687 pub fn passed_count_width(&self) -> usize {
692 match self {
693 Self::Incomplete | Self::Unknown => 0,
694 Self::Completed(stats) | Self::Cancelled(stats) => decimal_char_width(stats.passed),
695 Self::StressCompleted(stats) | Self::StressCancelled(stats) => {
696 decimal_char_width(stats.success_count)
697 }
698 }
699 }
700}
701
702impl RecordedRunInfo {
703 pub fn check_replayability(&self, files: &dyn RunFilesExist) -> ReplayabilityStatus {
715 let mut blocking = Vec::new();
716 let mut is_incomplete = false;
717
718 if let Err(incompatibility) = self
720 .store_format_version
721 .check_readable_by(STORE_FORMAT_VERSION)
722 {
723 blocking.push(NonReplayableReason::StoreVersionIncompatible { incompatibility });
724 }
725
726 if !files.store_zip_exists() {
728 blocking.push(NonReplayableReason::MissingStoreZip);
729 }
730 if !files.run_log_exists() {
731 blocking.push(NonReplayableReason::MissingRunLog);
732 }
733
734 match &self.status {
736 RecordedRunStatus::Unknown => {
737 blocking.push(NonReplayableReason::StatusUnknown);
738 }
739 RecordedRunStatus::Incomplete => {
740 is_incomplete = true;
741 }
742 RecordedRunStatus::Completed(_)
743 | RecordedRunStatus::Cancelled(_)
744 | RecordedRunStatus::StressCompleted(_)
745 | RecordedRunStatus::StressCancelled(_) => {
746 }
748 }
749
750 if !blocking.is_empty() {
752 ReplayabilityStatus::NotReplayable(blocking)
753 } else if is_incomplete {
754 ReplayabilityStatus::Incomplete
755 } else {
756 ReplayabilityStatus::Replayable
757 }
758 }
759
760 pub fn display<'a>(
772 &'a self,
773 run_id_index: &'a RunIdIndex,
774 replayability: &'a ReplayabilityStatus,
775 alignment: RunListAlignment,
776 styles: &'a Styles,
777 redactor: &'a Redactor,
778 ) -> DisplayRecordedRunInfo<'a> {
779 DisplayRecordedRunInfo::new(
780 self,
781 run_id_index,
782 replayability,
783 alignment,
784 styles,
785 redactor,
786 )
787 }
788
789 pub fn display_detailed<'a>(
803 &'a self,
804 run_id_index: &'a RunIdIndex,
805 replayability: &'a ReplayabilityStatus,
806 now: DateTime<Utc>,
807 styles: &'a Styles,
808 theme_characters: &'a ThemeCharacters,
809 redactor: &'a Redactor,
810 ) -> DisplayRecordedRunInfoDetailed<'a> {
811 DisplayRecordedRunInfoDetailed::new(
812 self,
813 run_id_index,
814 replayability,
815 now,
816 styles,
817 theme_characters,
818 redactor,
819 )
820 }
821}
822
823struct ReadRunsJsonResult {
825 runs: Vec<RecordedRunInfo>,
826 last_pruned_at: Option<DateTime<Utc>>,
827 write_permission: RunsJsonWritePermission,
828}
829
830fn read_runs_json(runs_dir: &Utf8Path) -> Result<ReadRunsJsonResult, RunStoreError> {
833 let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
834 let file = match File::open(&runs_json_path) {
835 Ok(file) => file,
836 Err(error) => {
837 if error.kind() == io::ErrorKind::NotFound {
838 return Ok(ReadRunsJsonResult {
840 runs: Vec::new(),
841 last_pruned_at: None,
842 write_permission: RunsJsonWritePermission::Allowed,
843 });
844 } else {
845 return Err(RunStoreError::RunListRead {
846 path: runs_json_path,
847 error,
848 });
849 }
850 }
851 };
852
853 let decoder = zstd::stream::Decoder::new(file).map_err(|error| RunStoreError::RunListRead {
854 path: runs_json_path.clone(),
855 error,
856 })?;
857
858 let list: RecordedRunList =
859 serde_json::from_reader(decoder).map_err(|error| RunStoreError::RunListDeserialize {
860 path: runs_json_path,
861 error,
862 })?;
863 let write_permission = list.write_permission();
864 let data = list.into_data();
865 Ok(ReadRunsJsonResult {
866 runs: data.runs,
867 last_pruned_at: data.last_pruned_at,
868 write_permission,
869 })
870}
871
872fn write_runs_json(
874 runs_dir: &Utf8Path,
875 runs: &[RecordedRunInfo],
876 last_pruned_at: Option<DateTime<Utc>>,
877) -> Result<(), RunStoreError> {
878 let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
879 let list = RecordedRunList::from_data(runs, last_pruned_at);
880
881 atomicwrites::AtomicFile::new(&runs_json_path, atomicwrites::AllowOverwrite)
882 .write(|file| {
883 let mut encoder = zstd::stream::Encoder::new(file, 3)?;
885 serde_json::to_writer_pretty(&mut encoder, &list)?;
886 encoder.finish()?;
887 Ok(())
888 })
889 .map_err(|error| RunStoreError::RunListWrite {
890 path: runs_json_path,
891 error,
892 })?;
893
894 Ok(())
895}
896
897#[derive(Debug)]
902pub struct SharedLockedRunStore<'store> {
903 runs_dir: StoreRunsDir<'store>,
904 #[expect(dead_code, reason = "held for lock duration")]
905 locked_file: DebugIgnore<File>,
906 runs: Vec<RecordedRunInfo>,
907 write_permission: RunsJsonWritePermission,
908 run_id_index: RunIdIndex,
909}
910
911impl<'store> SharedLockedRunStore<'store> {
912 pub fn into_snapshot(self) -> RunStoreSnapshot {
915 RunStoreSnapshot {
916 runs_dir: self.runs_dir.as_path().to_owned(),
917 runs: self.runs,
918 write_permission: self.write_permission,
919 run_id_index: self.run_id_index,
920 }
921 }
922}
923
924#[derive(Debug)]
926pub struct RunStoreSnapshot {
927 runs_dir: Utf8PathBuf,
928 runs: Vec<RecordedRunInfo>,
929 write_permission: RunsJsonWritePermission,
930 run_id_index: RunIdIndex,
931}
932
933impl RunStoreSnapshot {
934 pub fn runs_dir(&self) -> StoreRunsDir<'_> {
936 StoreRunsDir(&self.runs_dir)
937 }
938
939 pub fn write_permission(&self) -> RunsJsonWritePermission {
943 self.write_permission
944 }
945
946 pub fn runs(&self) -> &[RecordedRunInfo] {
948 &self.runs
949 }
950
951 pub fn run_count(&self) -> usize {
953 self.runs.len()
954 }
955
956 pub fn total_size(&self) -> u64 {
958 self.runs.iter().map(|r| r.sizes.total_compressed()).sum()
959 }
960
961 pub fn resolve_run_id(
967 &self,
968 selector: &RunIdSelector,
969 ) -> Result<ResolveRunIdResult, RunIdResolutionError> {
970 match selector {
971 RunIdSelector::Latest => self.most_recent_run(),
972 RunIdSelector::Prefix(prefix) => {
973 let run_id = self.resolve_run_id_prefix(prefix)?;
974 Ok(ResolveRunIdResult { run_id })
975 }
976 }
977 }
978
979 fn resolve_run_id_prefix(&self, prefix: &str) -> Result<ReportUuid, RunIdResolutionError> {
985 self.run_id_index.resolve_prefix(prefix).map_err(|err| {
986 match err {
987 PrefixResolutionError::NotFound => RunIdResolutionError::NotFound {
988 prefix: prefix.to_string(),
989 },
990 PrefixResolutionError::Ambiguous { count, candidates } => {
991 let mut candidates: Vec<_> = candidates
993 .into_iter()
994 .filter_map(|run_id| self.get_run(run_id).cloned())
995 .collect();
996 candidates.sort_by(|a, b| b.started_at.cmp(&a.started_at));
997 RunIdResolutionError::Ambiguous {
998 prefix: prefix.to_string(),
999 count,
1000 candidates,
1001 run_id_index: self.run_id_index.clone(),
1002 }
1003 }
1004 PrefixResolutionError::InvalidPrefix => RunIdResolutionError::InvalidPrefix {
1005 prefix: prefix.to_string(),
1006 },
1007 }
1008 })
1009 }
1010
1011 pub fn run_id_index(&self) -> &RunIdIndex {
1013 &self.run_id_index
1014 }
1015
1016 pub fn get_run(&self, run_id: ReportUuid) -> Option<&RecordedRunInfo> {
1018 self.runs.iter().find(|r| r.run_id == run_id)
1019 }
1020
1021 pub fn most_recent_run(&self) -> Result<ResolveRunIdResult, RunIdResolutionError> {
1025 self.runs
1026 .iter()
1027 .max_by_key(|r| r.started_at)
1028 .map(|r| ResolveRunIdResult { run_id: r.run_id })
1029 .ok_or(RunIdResolutionError::NoRuns)
1030 }
1031
1032 pub fn compute_prune_plan(&self, policy: &RecordRetentionPolicy) -> PrunePlan {
1038 PrunePlan::compute(&self.runs, policy)
1039 }
1040}
1041
1042#[derive(Debug)]
1048pub struct SnapshotWithReplayability<'a> {
1049 snapshot: &'a RunStoreSnapshot,
1050 replayability: HashMap<ReportUuid, ReplayabilityStatus>,
1051 latest_run_id: Option<ReportUuid>,
1052}
1053
1054impl<'a> SnapshotWithReplayability<'a> {
1055 pub fn new(snapshot: &'a RunStoreSnapshot) -> Self {
1060 let runs_dir = snapshot.runs_dir();
1061 let replayability: HashMap<_, _> = snapshot
1062 .runs()
1063 .iter()
1064 .map(|run| {
1065 (
1066 run.run_id,
1067 run.check_replayability(&runs_dir.run_files(run.run_id)),
1068 )
1069 })
1070 .collect();
1071
1072 let latest_run_id = snapshot.most_recent_run().ok().map(|r| r.run_id);
1074
1075 Self {
1076 snapshot,
1077 replayability,
1078 latest_run_id,
1079 }
1080 }
1081
1082 pub fn snapshot(&self) -> &'a RunStoreSnapshot {
1084 self.snapshot
1085 }
1086
1087 pub fn replayability(&self) -> &HashMap<ReportUuid, ReplayabilityStatus> {
1089 &self.replayability
1090 }
1091
1092 pub fn get_replayability(&self, run_id: ReportUuid) -> &ReplayabilityStatus {
1099 self.replayability
1100 .get(&run_id)
1101 .expect("run ID should be in replayability map")
1102 }
1103
1104 pub fn latest_run_id(&self) -> Option<ReportUuid> {
1106 self.latest_run_id
1107 }
1108}
1109
1110#[cfg(test)]
1111impl SnapshotWithReplayability<'_> {
1112 pub fn new_for_test(snapshot: &RunStoreSnapshot) -> SnapshotWithReplayability<'_> {
1116 let replayability: HashMap<_, _> = snapshot
1117 .runs()
1118 .iter()
1119 .map(|run| (run.run_id, ReplayabilityStatus::Replayable))
1120 .collect();
1121
1122 let latest_run_id = snapshot
1124 .runs()
1125 .iter()
1126 .max_by_key(|r| r.started_at)
1127 .map(|r| r.run_id);
1128
1129 SnapshotWithReplayability {
1130 snapshot,
1131 replayability,
1132 latest_run_id,
1133 }
1134 }
1135}
1136
1137#[cfg(test)]
1138impl RunStoreSnapshot {
1139 pub(crate) fn new_for_test(runs: Vec<RecordedRunInfo>) -> Self {
1141 use super::run_id_index::RunIdIndex;
1142
1143 let run_id_index = RunIdIndex::new(&runs);
1144 Self {
1145 runs_dir: Utf8PathBuf::from("/test/runs"),
1146 runs,
1147 write_permission: RunsJsonWritePermission::Allowed,
1148 run_id_index,
1149 }
1150 }
1151}
1152
1153#[derive(Clone, Copy)]
1155enum LockKind {
1156 Shared,
1157 Exclusive,
1158}
1159
1160fn acquire_lock_with_retry(
1165 file: &File,
1166 lock_file_path: &Utf8Path,
1167 kind: LockKind,
1168) -> Result<(), RunStoreError> {
1169 const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
1170 const LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(100);
1171
1172 let start = Instant::now();
1173 loop {
1174 let result = match kind {
1175 LockKind::Shared => file.try_lock_shared(),
1176 LockKind::Exclusive => file.try_lock(),
1177 };
1178
1179 match result {
1180 Ok(()) => return Ok(()),
1181 Err(TryLockError::WouldBlock) => {
1182 if start.elapsed() >= LOCK_TIMEOUT {
1184 return Err(RunStoreError::FileLockTimeout {
1185 path: lock_file_path.to_owned(),
1186 timeout_secs: LOCK_TIMEOUT.as_secs(),
1187 });
1188 }
1189 thread::sleep(LOCK_RETRY_INTERVAL);
1190 }
1191 Err(TryLockError::Error(error)) => {
1192 return Err(RunStoreError::FileLock {
1194 path: lock_file_path.to_owned(),
1195 error,
1196 });
1197 }
1198 }
1199 }
1200}