1use super::{
13 display::{DisplayRecordedRunInfo, DisplayRecordedRunInfoDetailed, RunListAlignment, Styles},
14 format::{
15 RECORD_FORMAT_VERSION, RUN_LOG_FILE_NAME, RecordedRunList, RunsJsonWritePermission,
16 STORE_ZIP_FILE_NAME,
17 },
18 recorder::{RunRecorder, StoreSizes},
19 retention::{
20 PruneKind, PrunePlan, PruneResult, RecordRetentionPolicy, delete_orphaned_dirs, delete_runs,
21 },
22 run_id_index::{PrefixResolutionError, RunIdIndex, RunIdSelector, ShortestRunIdPrefix},
23};
24use crate::{
25 errors::{RunIdResolutionError, RunStoreError},
26 helpers::{ThemeCharacters, u32_decimal_char_width, usize_decimal_char_width},
27 redact::Redactor,
28};
29use camino::{Utf8Path, Utf8PathBuf};
30use chrono::{DateTime, FixedOffset, Local, TimeDelta, Utc};
31use debug_ignore::DebugIgnore;
32use quick_junit::ReportUuid;
33use semver::Version;
34use std::{
35 collections::{BTreeMap, HashMap, HashSet},
36 fmt,
37 fs::{File, TryLockError},
38 io,
39 num::NonZero,
40 thread,
41 time::{Duration, Instant},
42};
43
44static RUNS_LOCK_FILE_NAME: &str = "runs.lock";
45static RUNS_JSON_FILE_NAME: &str = "runs.json.zst";
46
47#[derive(Clone, Copy, Debug)]
51pub struct StoreRunsDir<'a>(&'a Utf8Path);
52
53impl<'a> StoreRunsDir<'a> {
54 pub fn run_dir(self, run_id: ReportUuid) -> Utf8PathBuf {
56 self.0.join(run_id.to_string())
57 }
58
59 pub fn as_path(self) -> &'a Utf8Path {
61 self.0
62 }
63}
64
65#[derive(Debug)]
71pub struct RunStore {
72 runs_dir: Utf8PathBuf,
73}
74
75impl RunStore {
76 pub fn new(store_dir: &Utf8Path) -> Result<Self, RunStoreError> {
80 let runs_dir = store_dir.join("runs");
81 std::fs::create_dir_all(&runs_dir).map_err(|error| RunStoreError::RunDirCreate {
82 run_dir: runs_dir.clone(),
83 error,
84 })?;
85
86 Ok(Self { runs_dir })
87 }
88
89 pub fn runs_dir(&self) -> StoreRunsDir<'_> {
91 StoreRunsDir(&self.runs_dir)
92 }
93
94 pub fn lock_shared(&self) -> Result<SharedLockedRunStore<'_>, RunStoreError> {
102 let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
103 let file = std::fs::OpenOptions::new()
104 .create(true)
105 .truncate(false)
106 .write(true)
107 .open(&lock_file_path)
108 .map_err(|error| RunStoreError::FileLock {
109 path: lock_file_path.clone(),
110 error,
111 })?;
112
113 acquire_lock_with_retry(&file, &lock_file_path, LockKind::Shared)?;
114 let result = read_runs_json(&self.runs_dir)?;
115 let run_id_index = RunIdIndex::new(&result.runs);
116
117 Ok(SharedLockedRunStore {
118 runs_dir: StoreRunsDir(&self.runs_dir),
119 locked_file: DebugIgnore(file),
120 runs: result.runs,
121 write_permission: result.write_permission,
122 run_id_index,
123 })
124 }
125
126 pub fn lock_exclusive(&self) -> Result<ExclusiveLockedRunStore<'_>, RunStoreError> {
134 let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
135 let file = std::fs::OpenOptions::new()
136 .create(true)
137 .truncate(false)
138 .write(true)
139 .open(&lock_file_path)
140 .map_err(|error| RunStoreError::FileLock {
141 path: lock_file_path.clone(),
142 error,
143 })?;
144
145 acquire_lock_with_retry(&file, &lock_file_path, LockKind::Exclusive)?;
146 let result = read_runs_json(&self.runs_dir)?;
147
148 Ok(ExclusiveLockedRunStore {
149 runs_dir: StoreRunsDir(&self.runs_dir),
150 locked_file: DebugIgnore(file),
151 runs: result.runs,
152 last_pruned_at: result.last_pruned_at,
153 write_permission: result.write_permission,
154 })
155 }
156}
157
158#[derive(Debug)]
163pub struct ExclusiveLockedRunStore<'store> {
164 runs_dir: StoreRunsDir<'store>,
165 #[expect(dead_code)]
167 locked_file: DebugIgnore<File>,
168 runs: Vec<RecordedRunInfo>,
169 last_pruned_at: Option<DateTime<Utc>>,
170 write_permission: RunsJsonWritePermission,
171}
172
173impl<'store> ExclusiveLockedRunStore<'store> {
174 pub fn runs_dir(&self) -> StoreRunsDir<'store> {
176 self.runs_dir
177 }
178
179 pub fn write_permission(&self) -> RunsJsonWritePermission {
183 self.write_permission
184 }
185
186 pub fn complete_run(
196 &mut self,
197 run_id: ReportUuid,
198 sizes: StoreSizes,
199 status: RecordedRunStatus,
200 duration_secs: Option<f64>,
201 ) -> Result<bool, RunStoreError> {
202 if let RunsJsonWritePermission::Denied {
203 file_version,
204 max_supported_version,
205 } = self.write_permission
206 {
207 return Err(RunStoreError::FormatVersionTooNew {
208 file_version,
209 max_supported_version,
210 });
211 }
212
213 let found = self.mark_run_completed_inner(run_id, sizes, status, duration_secs);
214 if found {
215 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
216 }
217 Ok(found)
218 }
219
220 fn mark_run_completed_inner(
222 &mut self,
223 run_id: ReportUuid,
224 sizes: StoreSizes,
225 status: RecordedRunStatus,
226 duration_secs: Option<f64>,
227 ) -> bool {
228 for run in &mut self.runs {
229 if run.run_id == run_id {
230 run.sizes = RecordedSizes {
231 log: ComponentSizes {
232 compressed: sizes.log.compressed,
233 uncompressed: sizes.log.uncompressed,
234 entries: sizes.log.entries,
235 },
236 store: ComponentSizes {
237 compressed: sizes.store.compressed,
238 uncompressed: sizes.store.uncompressed,
239 entries: sizes.store.entries,
240 },
241 };
242 run.status = status;
243 run.duration_secs = duration_secs;
244 run.last_written_at = Local::now().fixed_offset();
245 return true;
246 }
247 }
248 false
249 }
250
251 pub fn prune(
268 &mut self,
269 policy: &RecordRetentionPolicy,
270 kind: PruneKind,
271 ) -> Result<PruneResult, RunStoreError> {
272 if let RunsJsonWritePermission::Denied {
273 file_version,
274 max_supported_version,
275 } = self.write_permission
276 {
277 return Err(RunStoreError::FormatVersionTooNew {
278 file_version,
279 max_supported_version,
280 });
281 }
282
283 let now = Utc::now();
284 let to_delete: HashSet<_> = policy
285 .compute_runs_to_delete(&self.runs, now)
286 .into_iter()
287 .collect();
288
289 let runs_dir = self.runs_dir();
290 let mut result = if to_delete.is_empty() {
291 PruneResult::default()
292 } else {
293 delete_runs(runs_dir, &mut self.runs, &to_delete)
294 };
295 result.kind = kind;
296
297 let known_runs: HashSet<_> = self.runs.iter().map(|r| r.run_id).collect();
298 delete_orphaned_dirs(self.runs_dir, &known_runs, &mut result);
299
300 if result.deleted_count > 0 || result.orphans_deleted > 0 {
301 self.last_pruned_at = Some(now);
303 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
304 }
305
306 Ok(result)
307 }
308
309 pub fn prune_if_needed(
319 &mut self,
320 policy: &RecordRetentionPolicy,
321 ) -> Result<Option<PruneResult>, RunStoreError> {
322 const PRUNE_INTERVAL: TimeDelta = match TimeDelta::try_days(1) {
323 Some(d) => d,
324 None => panic!("1 day should always be a valid TimeDelta"),
325 };
326 const LIMIT_EXCEEDED_FACTOR: f64 = 1.5;
327
328 let now = Utc::now();
329
330 let time_since_last_prune = self
332 .last_pruned_at
333 .map(|last| now.signed_duration_since(last))
334 .unwrap_or(TimeDelta::MAX);
335
336 let should_prune = time_since_last_prune >= PRUNE_INTERVAL
337 || policy.limits_exceeded_by_factor(&self.runs, LIMIT_EXCEEDED_FACTOR);
338
339 if should_prune {
340 Ok(Some(self.prune(policy, PruneKind::Implicit)?))
341 } else {
342 Ok(None)
343 }
344 }
345
346 #[expect(clippy::too_many_arguments)]
358 pub(crate) fn create_run_recorder(
359 mut self,
360 run_id: ReportUuid,
361 nextest_version: Version,
362 started_at: DateTime<FixedOffset>,
363 cli_args: Vec<String>,
364 build_scope_args: Vec<String>,
365 env_vars: BTreeMap<String, String>,
366 max_output_size: bytesize::ByteSize,
367 parent_run_id: Option<ReportUuid>,
368 ) -> Result<(RunRecorder, ShortestRunIdPrefix), RunStoreError> {
369 if let RunsJsonWritePermission::Denied {
370 file_version,
371 max_supported_version,
372 } = self.write_permission
373 {
374 return Err(RunStoreError::FormatVersionTooNew {
375 file_version,
376 max_supported_version,
377 });
378 }
379
380 let now = Local::now().fixed_offset();
386 let run = RecordedRunInfo {
387 run_id,
388 store_format_version: RECORD_FORMAT_VERSION,
389 nextest_version,
390 started_at,
391 last_written_at: now,
392 duration_secs: None,
393 cli_args,
394 build_scope_args,
395 env_vars,
396 parent_run_id,
397 sizes: RecordedSizes::default(),
398 status: RecordedRunStatus::Incomplete,
399 };
400 self.runs.push(run);
401
402 if let Some(parent_run_id) = parent_run_id
404 && let Some(parent_run) = self.runs.iter_mut().find(|r| r.run_id == parent_run_id)
405 {
406 parent_run.last_written_at = now;
407 }
408
409 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
410
411 let index = RunIdIndex::new(&self.runs);
413 let unique_prefix = index
414 .shortest_unique_prefix(run_id)
415 .expect("run was just added to the list");
416
417 let run_dir = self.runs_dir().run_dir(run_id);
422
423 let recorder = RunRecorder::new(run_dir, max_output_size)?;
424 Ok((recorder, unique_prefix))
425 }
426}
427
428#[derive(Clone, Debug)]
430pub struct RecordedRunInfo {
431 pub run_id: ReportUuid,
433 pub store_format_version: u32,
437 pub nextest_version: Version,
439 pub started_at: DateTime<FixedOffset>,
441 pub last_written_at: DateTime<FixedOffset>,
446 pub duration_secs: Option<f64>,
450 pub cli_args: Vec<String>,
452 pub build_scope_args: Vec<String>,
457 pub env_vars: BTreeMap<String, String>,
459 pub parent_run_id: Option<ReportUuid>,
463 pub sizes: RecordedSizes,
465 pub status: RecordedRunStatus,
467}
468
469#[derive(Clone, Copy, Debug, Default)]
471pub struct RecordedSizes {
472 pub log: ComponentSizes,
474 pub store: ComponentSizes,
476}
477
478#[derive(Clone, Copy, Debug, Default)]
480pub struct ComponentSizes {
481 pub compressed: u64,
483 pub uncompressed: u64,
485 pub entries: u64,
487}
488
489impl RecordedSizes {
490 pub fn total_compressed(&self) -> u64 {
492 self.log.compressed + self.store.compressed
493 }
494
495 pub fn total_uncompressed(&self) -> u64 {
497 self.log.uncompressed + self.store.uncompressed
498 }
499
500 pub fn total_entries(&self) -> u64 {
502 self.log.entries + self.store.entries
503 }
504}
505
506#[derive(Clone, Debug)]
508pub enum RecordedRunStatus {
509 Incomplete,
511 Completed(CompletedRunStats),
513 Cancelled(CompletedRunStats),
515 StressCompleted(StressCompletedRunStats),
517 StressCancelled(StressCompletedRunStats),
519 Unknown,
524}
525
526impl RecordedRunStatus {
527 pub fn short_status_str(&self) -> &'static str {
529 match self {
530 Self::Incomplete => "incomplete",
531 Self::Unknown => "unknown",
532 Self::Completed(_) => "completed",
533 Self::Cancelled(_) => "cancelled",
534 Self::StressCompleted(_) => "stress completed",
535 Self::StressCancelled(_) => "stress cancelled",
536 }
537 }
538
539 pub fn exit_code(&self) -> Option<i32> {
541 match self {
542 Self::Incomplete | Self::Unknown => None,
543 Self::Completed(stats) | Self::Cancelled(stats) => Some(stats.exit_code),
544 Self::StressCompleted(stats) | Self::StressCancelled(stats) => Some(stats.exit_code),
545 }
546 }
547}
548
549#[derive(Clone, Copy, Debug)]
551pub struct CompletedRunStats {
552 pub initial_run_count: usize,
554 pub passed: usize,
556 pub failed: usize,
558 pub exit_code: i32,
560}
561
562#[derive(Clone, Copy, Debug)]
564pub struct StressCompletedRunStats {
565 pub initial_iteration_count: Option<NonZero<u32>>,
570 pub success_count: u32,
572 pub failed_count: u32,
574 pub exit_code: i32,
576}
577
578#[derive(Clone, Debug)]
584pub enum ReplayabilityStatus {
585 Replayable,
589 NotReplayable(Vec<NonReplayableReason>),
593 Incomplete,
598}
599
600#[derive(Clone, Debug, PartialEq, Eq)]
602pub enum NonReplayableReason {
603 StoreFormatTooNew {
607 run_version: u32,
609 max_supported: u32,
611 },
612 MissingStoreZip,
614 MissingRunLog,
616 StatusUnknown,
620}
621
622impl fmt::Display for NonReplayableReason {
623 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624 match self {
625 Self::StoreFormatTooNew {
626 run_version,
627 max_supported,
628 } => {
629 write!(
630 f,
631 "store format version {} is newer than supported (version {})",
632 run_version, max_supported
633 )
634 }
635 Self::MissingStoreZip => {
636 write!(f, "store.zip is missing")
637 }
638 Self::MissingRunLog => {
639 write!(f, "run.log.zst is missing")
640 }
641 Self::StatusUnknown => {
642 write!(f, "run status is unknown (from a newer nextest version)")
643 }
644 }
645 }
646}
647
648#[derive(Clone, Copy, Debug)]
650pub struct ResolveRunIdResult {
651 pub run_id: ReportUuid,
653 pub newer_non_replayable_count: usize,
655}
656
657impl RecordedRunStatus {
658 pub fn passed_count_width(&self) -> usize {
663 match self {
664 Self::Incomplete | Self::Unknown => 0,
665 Self::Completed(stats) | Self::Cancelled(stats) => {
666 usize_decimal_char_width(stats.passed)
667 }
668 Self::StressCompleted(stats) | Self::StressCancelled(stats) => {
669 u32_decimal_char_width(stats.success_count)
671 }
672 }
673 }
674}
675
676impl RecordedRunInfo {
677 pub fn check_replayability(&self, runs_dir: StoreRunsDir<'_>) -> ReplayabilityStatus {
687 let mut blocking = Vec::new();
688 let mut is_incomplete = false;
689
690 if self.store_format_version > RECORD_FORMAT_VERSION {
692 blocking.push(NonReplayableReason::StoreFormatTooNew {
693 run_version: self.store_format_version,
694 max_supported: RECORD_FORMAT_VERSION,
695 });
696 }
697 let run_dir = runs_dir.run_dir(self.run_id);
702 let store_zip_path = run_dir.join(STORE_ZIP_FILE_NAME);
703 let run_log_path = run_dir.join(RUN_LOG_FILE_NAME);
704
705 if !store_zip_path.exists() {
706 blocking.push(NonReplayableReason::MissingStoreZip);
707 }
708 if !run_log_path.exists() {
709 blocking.push(NonReplayableReason::MissingRunLog);
710 }
711
712 match &self.status {
714 RecordedRunStatus::Unknown => {
715 blocking.push(NonReplayableReason::StatusUnknown);
716 }
717 RecordedRunStatus::Incomplete => {
718 is_incomplete = true;
719 }
720 RecordedRunStatus::Completed(_)
721 | RecordedRunStatus::Cancelled(_)
722 | RecordedRunStatus::StressCompleted(_)
723 | RecordedRunStatus::StressCancelled(_) => {
724 }
726 }
727
728 if !blocking.is_empty() {
730 ReplayabilityStatus::NotReplayable(blocking)
731 } else if is_incomplete {
732 ReplayabilityStatus::Incomplete
733 } else {
734 ReplayabilityStatus::Replayable
735 }
736 }
737
738 pub fn display<'a>(
750 &'a self,
751 run_id_index: &'a RunIdIndex,
752 replayability: &'a ReplayabilityStatus,
753 alignment: RunListAlignment,
754 styles: &'a Styles,
755 redactor: &'a Redactor,
756 ) -> DisplayRecordedRunInfo<'a> {
757 DisplayRecordedRunInfo::new(
758 self,
759 run_id_index,
760 replayability,
761 alignment,
762 styles,
763 redactor,
764 )
765 }
766
767 pub fn display_detailed<'a>(
781 &'a self,
782 run_id_index: &'a RunIdIndex,
783 replayability: &'a ReplayabilityStatus,
784 now: DateTime<Utc>,
785 styles: &'a Styles,
786 theme_characters: &'a ThemeCharacters,
787 redactor: &'a Redactor,
788 ) -> DisplayRecordedRunInfoDetailed<'a> {
789 DisplayRecordedRunInfoDetailed::new(
790 self,
791 run_id_index,
792 replayability,
793 now,
794 styles,
795 theme_characters,
796 redactor,
797 )
798 }
799}
800
801struct ReadRunsJsonResult {
803 runs: Vec<RecordedRunInfo>,
804 last_pruned_at: Option<DateTime<Utc>>,
805 write_permission: RunsJsonWritePermission,
806}
807
808fn read_runs_json(runs_dir: &Utf8Path) -> Result<ReadRunsJsonResult, RunStoreError> {
811 let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
812 let file = match File::open(&runs_json_path) {
813 Ok(file) => file,
814 Err(error) => {
815 if error.kind() == io::ErrorKind::NotFound {
816 return Ok(ReadRunsJsonResult {
818 runs: Vec::new(),
819 last_pruned_at: None,
820 write_permission: RunsJsonWritePermission::Allowed,
821 });
822 } else {
823 return Err(RunStoreError::RunListRead {
824 path: runs_json_path,
825 error,
826 });
827 }
828 }
829 };
830
831 let decoder = zstd::stream::Decoder::new(file).map_err(|error| RunStoreError::RunListRead {
832 path: runs_json_path.clone(),
833 error,
834 })?;
835
836 let list: RecordedRunList =
837 serde_json::from_reader(decoder).map_err(|error| RunStoreError::RunListDeserialize {
838 path: runs_json_path,
839 error,
840 })?;
841 let write_permission = list.write_permission();
842 let data = list.into_data();
843 Ok(ReadRunsJsonResult {
844 runs: data.runs,
845 last_pruned_at: data.last_pruned_at,
846 write_permission,
847 })
848}
849
850fn write_runs_json(
852 runs_dir: &Utf8Path,
853 runs: &[RecordedRunInfo],
854 last_pruned_at: Option<DateTime<Utc>>,
855) -> Result<(), RunStoreError> {
856 let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
857 let list = RecordedRunList::from_data(runs, last_pruned_at);
858
859 atomicwrites::AtomicFile::new(&runs_json_path, atomicwrites::AllowOverwrite)
860 .write(|file| {
861 let mut encoder = zstd::stream::Encoder::new(file, 3)?;
863 serde_json::to_writer_pretty(&mut encoder, &list)?;
864 encoder.finish()?;
865 Ok(())
866 })
867 .map_err(|error| RunStoreError::RunListWrite {
868 path: runs_json_path,
869 error,
870 })?;
871
872 Ok(())
873}
874
875#[derive(Debug)]
880pub struct SharedLockedRunStore<'store> {
881 runs_dir: StoreRunsDir<'store>,
882 #[expect(dead_code, reason = "held for lock duration")]
883 locked_file: DebugIgnore<File>,
884 runs: Vec<RecordedRunInfo>,
885 write_permission: RunsJsonWritePermission,
886 run_id_index: RunIdIndex,
887}
888
889impl<'store> SharedLockedRunStore<'store> {
890 pub fn into_snapshot(self) -> RunStoreSnapshot {
893 RunStoreSnapshot {
894 runs_dir: self.runs_dir.as_path().to_owned(),
895 runs: self.runs,
896 write_permission: self.write_permission,
897 run_id_index: self.run_id_index,
898 }
899 }
900}
901
902#[derive(Debug)]
904pub struct RunStoreSnapshot {
905 runs_dir: Utf8PathBuf,
906 runs: Vec<RecordedRunInfo>,
907 write_permission: RunsJsonWritePermission,
908 run_id_index: RunIdIndex,
909}
910
911impl RunStoreSnapshot {
912 pub fn runs_dir(&self) -> StoreRunsDir<'_> {
914 StoreRunsDir(&self.runs_dir)
915 }
916
917 pub fn write_permission(&self) -> RunsJsonWritePermission {
921 self.write_permission
922 }
923
924 pub fn runs(&self) -> &[RecordedRunInfo] {
926 &self.runs
927 }
928
929 pub fn run_count(&self) -> usize {
931 self.runs.len()
932 }
933
934 pub fn total_size(&self) -> u64 {
936 self.runs.iter().map(|r| r.sizes.total_compressed()).sum()
937 }
938
939 pub fn resolve_run_id(
947 &self,
948 selector: &RunIdSelector,
949 ) -> Result<ResolveRunIdResult, RunIdResolutionError> {
950 match selector {
951 RunIdSelector::Latest => self.most_recent_run(None),
952 RunIdSelector::Prefix(prefix) => {
953 let run_id = self.resolve_run_id_prefix(prefix)?;
954 Ok(ResolveRunIdResult {
955 run_id,
956 newer_non_replayable_count: 0,
957 })
958 }
959 }
960 }
961
962 fn resolve_run_id_prefix(&self, prefix: &str) -> Result<ReportUuid, RunIdResolutionError> {
968 self.run_id_index.resolve_prefix(prefix).map_err(|err| {
969 match err {
970 PrefixResolutionError::NotFound => RunIdResolutionError::NotFound {
971 prefix: prefix.to_string(),
972 },
973 PrefixResolutionError::Ambiguous { count, candidates } => {
974 let mut candidates: Vec<_> = candidates
976 .into_iter()
977 .filter_map(|run_id| self.get_run(run_id).cloned())
978 .collect();
979 candidates.sort_by(|a, b| b.started_at.cmp(&a.started_at));
980 RunIdResolutionError::Ambiguous {
981 prefix: prefix.to_string(),
982 count,
983 candidates,
984 run_id_index: self.run_id_index.clone(),
985 }
986 }
987 PrefixResolutionError::InvalidPrefix => RunIdResolutionError::InvalidPrefix {
988 prefix: prefix.to_string(),
989 },
990 }
991 })
992 }
993
994 pub fn run_id_index(&self) -> &RunIdIndex {
996 &self.run_id_index
997 }
998
999 pub fn get_run(&self, run_id: ReportUuid) -> Option<&RecordedRunInfo> {
1001 self.runs.iter().find(|r| r.run_id == run_id)
1002 }
1003
1004 pub fn most_recent_run(
1016 &self,
1017 replayability: Option<&HashMap<ReportUuid, ReplayabilityStatus>>,
1018 ) -> Result<ResolveRunIdResult, RunIdResolutionError> {
1019 if self.runs.is_empty() {
1020 return Err(RunIdResolutionError::NoRuns);
1021 }
1022
1023 let mut sorted_runs: Vec<_> = self.runs.iter().collect();
1025 sorted_runs.sort_by(|a, b| b.started_at.cmp(&a.started_at));
1026
1027 let runs_dir = self.runs_dir();
1030 let mut newer_non_replayable_count = 0;
1031 for run in sorted_runs {
1032 let is_replayable = match replayability {
1033 Some(replayability) => {
1034 let replayability = replayability.get(&run.run_id).unwrap_or_else(|| {
1036 panic!("replayability index should have run ID {}", run.run_id)
1037 });
1038 matches!(replayability, &ReplayabilityStatus::Replayable)
1039 }
1040 None => {
1041 matches!(
1044 run.check_replayability(runs_dir),
1045 ReplayabilityStatus::Replayable
1046 )
1047 }
1048 };
1049 if is_replayable {
1050 return Ok(ResolveRunIdResult {
1051 run_id: run.run_id,
1052 newer_non_replayable_count,
1053 });
1054 }
1055 newer_non_replayable_count += 1;
1056 }
1057
1058 Err(RunIdResolutionError::NoReplayableRuns {
1059 non_replayable_count: newer_non_replayable_count,
1060 })
1061 }
1062
1063 pub fn compute_prune_plan(&self, policy: &RecordRetentionPolicy) -> PrunePlan {
1069 PrunePlan::compute(&self.runs, policy)
1070 }
1071}
1072
1073#[derive(Debug)]
1079pub struct SnapshotWithReplayability<'a> {
1080 snapshot: &'a RunStoreSnapshot,
1081 replayability: HashMap<ReportUuid, ReplayabilityStatus>,
1082 latest_run_id: Option<ReportUuid>,
1083}
1084
1085impl<'a> SnapshotWithReplayability<'a> {
1086 pub fn new(snapshot: &'a RunStoreSnapshot) -> Self {
1091 let runs_dir = snapshot.runs_dir();
1092 let replayability: HashMap<_, _> = snapshot
1093 .runs()
1094 .iter()
1095 .map(|run| (run.run_id, run.check_replayability(runs_dir)))
1096 .collect();
1097
1098 let latest_run_id = snapshot
1100 .most_recent_run(Some(&replayability))
1101 .ok()
1102 .map(|r| r.run_id);
1103
1104 Self {
1105 snapshot,
1106 replayability,
1107 latest_run_id,
1108 }
1109 }
1110
1111 pub fn snapshot(&self) -> &'a RunStoreSnapshot {
1113 self.snapshot
1114 }
1115
1116 pub fn replayability(&self) -> &HashMap<ReportUuid, ReplayabilityStatus> {
1118 &self.replayability
1119 }
1120
1121 pub fn get_replayability(&self, run_id: ReportUuid) -> &ReplayabilityStatus {
1128 self.replayability
1129 .get(&run_id)
1130 .expect("run ID should be in replayability map")
1131 }
1132
1133 pub fn latest_run_id(&self) -> Option<ReportUuid> {
1135 self.latest_run_id
1136 }
1137}
1138
1139#[cfg(test)]
1140impl SnapshotWithReplayability<'_> {
1141 pub fn new_for_test(snapshot: &RunStoreSnapshot) -> SnapshotWithReplayability<'_> {
1145 let replayability: HashMap<_, _> = snapshot
1146 .runs()
1147 .iter()
1148 .map(|run| (run.run_id, ReplayabilityStatus::Replayable))
1149 .collect();
1150
1151 let latest_run_id = snapshot
1153 .runs()
1154 .iter()
1155 .max_by_key(|r| r.started_at)
1156 .map(|r| r.run_id);
1157
1158 SnapshotWithReplayability {
1159 snapshot,
1160 replayability,
1161 latest_run_id,
1162 }
1163 }
1164}
1165
1166#[cfg(test)]
1167impl RunStoreSnapshot {
1168 pub(crate) fn new_for_test(runs: Vec<RecordedRunInfo>) -> Self {
1170 use super::run_id_index::RunIdIndex;
1171
1172 let run_id_index = RunIdIndex::new(&runs);
1173 Self {
1174 runs_dir: Utf8PathBuf::from("/test/runs"),
1175 runs,
1176 write_permission: RunsJsonWritePermission::Allowed,
1177 run_id_index,
1178 }
1179 }
1180}
1181
1182#[derive(Clone, Copy)]
1184enum LockKind {
1185 Shared,
1186 Exclusive,
1187}
1188
1189fn acquire_lock_with_retry(
1194 file: &File,
1195 lock_file_path: &Utf8Path,
1196 kind: LockKind,
1197) -> Result<(), RunStoreError> {
1198 const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
1199 const LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(100);
1200
1201 let start = Instant::now();
1202 loop {
1203 let result = match kind {
1204 LockKind::Shared => file.try_lock_shared(),
1205 LockKind::Exclusive => file.try_lock(),
1206 };
1207
1208 match result {
1209 Ok(()) => return Ok(()),
1210 Err(TryLockError::WouldBlock) => {
1211 if start.elapsed() >= LOCK_TIMEOUT {
1213 return Err(RunStoreError::FileLockTimeout {
1214 path: lock_file_path.to_owned(),
1215 timeout_secs: LOCK_TIMEOUT.as_secs(),
1216 });
1217 }
1218 thread::sleep(LOCK_RETRY_INTERVAL);
1219 }
1220 Err(TryLockError::Error(error)) => {
1221 return Err(RunStoreError::FileLock {
1223 path: lock_file_path.to_owned(),
1224 error,
1225 });
1226 }
1227 }
1228 }
1229}