1use super::{
13 display::{DisplayRecordedRunInfo, DisplayRecordedRunInfoDetailed, RunListAlignment, Styles},
14 format::{
15 RUN_LOG_FILE_NAME, RecordedRunList, RunsJsonWritePermission, STORE_FORMAT_VERSION,
16 STORE_ZIP_FILE_NAME, StoreFormatVersion, StoreVersionIncompatibility,
17 store_format_version_for_new_run,
18 },
19 recorder::{RunRecorder, StoreSizes},
20 retention::{
21 PruneKind, PrunePlan, PruneResult, RecordRetentionPolicy, delete_orphaned_dirs, delete_runs,
22 },
23 run_id_index::{PrefixResolutionError, RunIdIndex, RunIdSelector, ShortestRunIdPrefix},
24};
25use crate::{
26 errors::{RunIdResolutionError, RunStoreError},
27 helpers::{ThemeCharacters, decimal_char_width},
28 redact::Redactor,
29};
30use camino::{Utf8Path, Utf8PathBuf};
31use chrono::{DateTime, FixedOffset, Local, TimeDelta, Utc};
32use debug_ignore::DebugIgnore;
33use quick_junit::ReportUuid;
34use semver::Version;
35use std::{
36 collections::{BTreeMap, HashMap, HashSet},
37 fmt,
38 fs::{File, TryLockError},
39 io,
40 num::NonZero,
41 thread,
42 time::{Duration, Instant},
43};
44
45static RUNS_LOCK_FILE_NAME: &str = "runs.lock";
46static RUNS_JSON_FILE_NAME: &str = "runs.json.zst";
47
48#[derive(Clone, Copy, Debug)]
52pub struct StoreRunsDir<'a>(&'a Utf8Path);
53
54impl<'a> StoreRunsDir<'a> {
55 pub fn new(path: &'a Utf8Path) -> Self {
57 Self(path)
58 }
59
60 pub fn run_dir(self, run_id: ReportUuid) -> Utf8PathBuf {
62 self.0.join(run_id.to_string())
63 }
64
65 pub fn run_files(self, run_id: ReportUuid) -> StoreRunFiles {
68 StoreRunFiles {
69 run_dir: self.run_dir(run_id),
70 }
71 }
72
73 pub fn as_path(self) -> &'a Utf8Path {
75 self.0
76 }
77}
78
79pub trait RunFilesExist {
83 fn store_zip_exists(&self) -> bool;
85 fn run_log_exists(&self) -> bool;
87}
88
89pub struct StoreRunFiles {
93 run_dir: Utf8PathBuf,
94}
95
96impl RunFilesExist for StoreRunFiles {
97 fn store_zip_exists(&self) -> bool {
98 self.run_dir.join(STORE_ZIP_FILE_NAME).exists()
99 }
100
101 fn run_log_exists(&self) -> bool {
102 self.run_dir.join(RUN_LOG_FILE_NAME).exists()
103 }
104}
105
106#[derive(Debug)]
112pub struct RunStore {
113 runs_dir: Utf8PathBuf,
114}
115
116impl RunStore {
117 pub fn new(store_dir: &Utf8Path) -> Result<Self, RunStoreError> {
121 let runs_dir = store_dir.join("runs");
122 std::fs::create_dir_all(&runs_dir).map_err(|error| RunStoreError::RunDirCreate {
123 run_dir: runs_dir.clone(),
124 error,
125 })?;
126
127 Ok(Self { runs_dir })
128 }
129
130 pub fn runs_dir(&self) -> StoreRunsDir<'_> {
132 StoreRunsDir(&self.runs_dir)
133 }
134
135 pub fn lock_shared(&self) -> Result<SharedLockedRunStore<'_>, RunStoreError> {
143 let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
144 let file = std::fs::OpenOptions::new()
145 .create(true)
146 .truncate(false)
147 .write(true)
148 .open(&lock_file_path)
149 .map_err(|error| RunStoreError::FileLock {
150 path: lock_file_path.clone(),
151 error,
152 })?;
153
154 acquire_lock_with_retry(&file, &lock_file_path, LockKind::Shared)?;
155 let result = read_runs_json(&self.runs_dir)?;
156 let run_id_index = RunIdIndex::new(&result.runs);
157
158 Ok(SharedLockedRunStore {
159 runs_dir: StoreRunsDir(&self.runs_dir),
160 locked_file: DebugIgnore(file),
161 runs: result.runs,
162 write_permission: result.write_permission,
163 run_id_index,
164 })
165 }
166
167 pub fn lock_exclusive(&self) -> Result<ExclusiveLockedRunStore<'_>, RunStoreError> {
175 let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
176 let file = std::fs::OpenOptions::new()
177 .create(true)
178 .truncate(false)
179 .write(true)
180 .open(&lock_file_path)
181 .map_err(|error| RunStoreError::FileLock {
182 path: lock_file_path.clone(),
183 error,
184 })?;
185
186 acquire_lock_with_retry(&file, &lock_file_path, LockKind::Exclusive)?;
187 let result = read_runs_json(&self.runs_dir)?;
188
189 Ok(ExclusiveLockedRunStore {
190 runs_dir: StoreRunsDir(&self.runs_dir),
191 locked_file: DebugIgnore(file),
192 runs: result.runs,
193 last_pruned_at: result.last_pruned_at,
194 write_permission: result.write_permission,
195 })
196 }
197}
198
199#[derive(Debug)]
204pub struct ExclusiveLockedRunStore<'store> {
205 runs_dir: StoreRunsDir<'store>,
206 #[expect(dead_code)]
208 locked_file: DebugIgnore<File>,
209 runs: Vec<RecordedRunInfo>,
210 last_pruned_at: Option<DateTime<Utc>>,
211 write_permission: RunsJsonWritePermission,
212}
213
214impl<'store> ExclusiveLockedRunStore<'store> {
215 pub fn runs_dir(&self) -> StoreRunsDir<'store> {
217 self.runs_dir
218 }
219
220 pub fn write_permission(&self) -> RunsJsonWritePermission {
224 self.write_permission
225 }
226
227 pub fn complete_run(
237 &mut self,
238 run_id: ReportUuid,
239 sizes: StoreSizes,
240 status: RecordedRunStatus,
241 duration_secs: Option<f64>,
242 ) -> Result<bool, RunStoreError> {
243 if let RunsJsonWritePermission::Denied {
244 file_version,
245 max_supported_version,
246 } = self.write_permission
247 {
248 return Err(RunStoreError::FormatVersionTooNew {
249 file_version,
250 max_supported_version,
251 });
252 }
253
254 let found = self.mark_run_completed_inner(run_id, sizes, status, duration_secs);
255 if found {
256 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
257 }
258 Ok(found)
259 }
260
261 fn mark_run_completed_inner(
263 &mut self,
264 run_id: ReportUuid,
265 sizes: StoreSizes,
266 status: RecordedRunStatus,
267 duration_secs: Option<f64>,
268 ) -> bool {
269 for run in &mut self.runs {
270 if run.run_id == run_id {
271 run.sizes = RecordedSizes {
272 log: ComponentSizes {
273 compressed: sizes.log.compressed,
274 uncompressed: sizes.log.uncompressed,
275 entries: sizes.log.entries,
276 },
277 store: ComponentSizes {
278 compressed: sizes.store.compressed,
279 uncompressed: sizes.store.uncompressed,
280 entries: sizes.store.entries,
281 },
282 };
283 run.status = status;
284 run.duration_secs = duration_secs;
285 run.last_written_at = Local::now().fixed_offset();
286 return true;
287 }
288 }
289 false
290 }
291
292 pub fn prune(
309 &mut self,
310 policy: &RecordRetentionPolicy,
311 kind: PruneKind,
312 ) -> Result<PruneResult, RunStoreError> {
313 if let RunsJsonWritePermission::Denied {
314 file_version,
315 max_supported_version,
316 } = self.write_permission
317 {
318 return Err(RunStoreError::FormatVersionTooNew {
319 file_version,
320 max_supported_version,
321 });
322 }
323
324 let now = Utc::now();
325 let to_delete: HashSet<_> = policy
326 .compute_runs_to_delete(&self.runs, now)
327 .into_iter()
328 .collect();
329
330 let runs_dir = self.runs_dir();
331 let mut result = if to_delete.is_empty() {
332 PruneResult::default()
333 } else {
334 delete_runs(runs_dir, &mut self.runs, &to_delete)
335 };
336 result.kind = kind;
337
338 let known_runs: HashSet<_> = self.runs.iter().map(|r| r.run_id).collect();
339 delete_orphaned_dirs(self.runs_dir, &known_runs, &mut result);
340
341 if result.deleted_count > 0 || result.orphans_deleted > 0 {
342 self.last_pruned_at = Some(now);
344 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
345 }
346
347 Ok(result)
348 }
349
350 pub fn prune_if_needed(
360 &mut self,
361 policy: &RecordRetentionPolicy,
362 ) -> Result<Option<PruneResult>, RunStoreError> {
363 const PRUNE_INTERVAL: TimeDelta = match TimeDelta::try_days(1) {
364 Some(d) => d,
365 None => panic!("1 day should always be a valid TimeDelta"),
366 };
367 const LIMIT_EXCEEDED_FACTOR: f64 = 1.5;
368
369 let now = Utc::now();
370
371 let time_since_last_prune = self
373 .last_pruned_at
374 .map(|last| now.signed_duration_since(last))
375 .unwrap_or(TimeDelta::MAX);
376
377 let should_prune = time_since_last_prune >= PRUNE_INTERVAL
378 || policy.limits_exceeded_by_factor(&self.runs, LIMIT_EXCEEDED_FACTOR);
379
380 if should_prune {
381 Ok(Some(self.prune(policy, PruneKind::Implicit)?))
382 } else {
383 Ok(None)
384 }
385 }
386
387 #[expect(clippy::too_many_arguments)]
399 pub(crate) fn create_run_recorder(
400 mut self,
401 run_id: ReportUuid,
402 nextest_version: Version,
403 started_at: DateTime<FixedOffset>,
404 cli_args: Vec<String>,
405 build_scope_args: Vec<String>,
406 env_vars: BTreeMap<String, String>,
407 max_output_size: bytesize::ByteSize,
408 parent_run_id: Option<ReportUuid>,
409 ) -> Result<(RunRecorder, ShortestRunIdPrefix), RunStoreError> {
410 if let RunsJsonWritePermission::Denied {
411 file_version,
412 max_supported_version,
413 } = self.write_permission
414 {
415 return Err(RunStoreError::FormatVersionTooNew {
416 file_version,
417 max_supported_version,
418 });
419 }
420
421 let now = Local::now().fixed_offset();
427 let run = RecordedRunInfo {
428 run_id,
429 store_format_version: store_format_version_for_new_run(),
430 nextest_version,
431 started_at,
432 last_written_at: now,
433 duration_secs: None,
434 cli_args,
435 build_scope_args,
436 env_vars,
437 parent_run_id,
438 sizes: RecordedSizes::default(),
439 status: RecordedRunStatus::Incomplete,
440 };
441 self.runs.push(run);
442
443 if let Some(parent_run_id) = parent_run_id
445 && let Some(parent_run) = self.runs.iter_mut().find(|r| r.run_id == parent_run_id)
446 {
447 parent_run.last_written_at = now;
448 }
449
450 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
451
452 let index = RunIdIndex::new(&self.runs);
454 let unique_prefix = index
455 .shortest_unique_prefix(run_id)
456 .expect("run was just added to the list");
457
458 let run_dir = self.runs_dir().run_dir(run_id);
463
464 let recorder = RunRecorder::new(run_dir, max_output_size)?;
465 Ok((recorder, unique_prefix))
466 }
467}
468
469#[derive(Clone, Debug)]
471pub struct RecordedRunInfo {
472 pub run_id: ReportUuid,
474 pub store_format_version: StoreFormatVersion,
478 pub nextest_version: Version,
480 pub started_at: DateTime<FixedOffset>,
482 pub last_written_at: DateTime<FixedOffset>,
487 pub duration_secs: Option<f64>,
491 pub cli_args: Vec<String>,
493 pub build_scope_args: Vec<String>,
498 pub env_vars: BTreeMap<String, String>,
500 pub parent_run_id: Option<ReportUuid>,
504 pub sizes: RecordedSizes,
506 pub status: RecordedRunStatus,
508}
509
510#[derive(Clone, Copy, Debug, Default)]
512pub struct RecordedSizes {
513 pub log: ComponentSizes,
515 pub store: ComponentSizes,
517}
518
519#[derive(Clone, Copy, Debug, Default)]
521pub struct ComponentSizes {
522 pub compressed: u64,
524 pub uncompressed: u64,
526 pub entries: u64,
528}
529
530impl RecordedSizes {
531 pub fn total_compressed(&self) -> u64 {
533 self.log.compressed + self.store.compressed
534 }
535
536 pub fn total_uncompressed(&self) -> u64 {
538 self.log.uncompressed + self.store.uncompressed
539 }
540
541 pub fn total_entries(&self) -> u64 {
543 self.log.entries + self.store.entries
544 }
545}
546
547#[derive(Clone, Debug)]
549pub enum RecordedRunStatus {
550 Incomplete,
552 Completed(CompletedRunStats),
554 Cancelled(CompletedRunStats),
556 StressCompleted(StressCompletedRunStats),
558 StressCancelled(StressCompletedRunStats),
560 Unknown,
565}
566
567impl RecordedRunStatus {
568 pub fn short_status_str(&self) -> &'static str {
570 match self {
571 Self::Incomplete => "incomplete",
572 Self::Unknown => "unknown",
573 Self::Completed(_) => "completed",
574 Self::Cancelled(_) => "cancelled",
575 Self::StressCompleted(_) => "stress completed",
576 Self::StressCancelled(_) => "stress cancelled",
577 }
578 }
579
580 pub fn exit_code(&self) -> Option<i32> {
582 match self {
583 Self::Incomplete | Self::Unknown => None,
584 Self::Completed(stats) | Self::Cancelled(stats) => Some(stats.exit_code),
585 Self::StressCompleted(stats) | Self::StressCancelled(stats) => Some(stats.exit_code),
586 }
587 }
588}
589
590#[derive(Clone, Copy, Debug)]
592pub struct CompletedRunStats {
593 pub initial_run_count: usize,
595 pub passed: usize,
597 pub failed: usize,
599 pub exit_code: i32,
601}
602
603#[derive(Clone, Copy, Debug)]
605pub struct StressCompletedRunStats {
606 pub initial_iteration_count: Option<NonZero<u32>>,
611 pub success_count: u32,
613 pub failed_count: u32,
615 pub exit_code: i32,
617}
618
619#[derive(Clone, Debug)]
625pub enum ReplayabilityStatus {
626 Replayable,
630 NotReplayable(Vec<NonReplayableReason>),
634 Incomplete,
639}
640
641#[derive(Clone, Debug, PartialEq, Eq)]
643pub enum NonReplayableReason {
644 StoreVersionIncompatible {
648 incompatibility: StoreVersionIncompatibility,
650 },
651 MissingStoreZip,
653 MissingRunLog,
655 StatusUnknown,
659}
660
661impl fmt::Display for NonReplayableReason {
662 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
663 match self {
664 Self::StoreVersionIncompatible { incompatibility } => {
665 write!(f, "store format version incompatible: {}", incompatibility)
666 }
667 Self::MissingStoreZip => {
668 write!(f, "store.zip is missing")
669 }
670 Self::MissingRunLog => {
671 write!(f, "run.log.zst is missing")
672 }
673 Self::StatusUnknown => {
674 write!(f, "run status is unknown (from a newer nextest version)")
675 }
676 }
677 }
678}
679
680#[derive(Clone, Copy, Debug)]
682pub struct ResolveRunIdResult {
683 pub run_id: ReportUuid,
685}
686
687impl RecordedRunStatus {
688 pub fn passed_count_width(&self) -> usize {
693 match self {
694 Self::Incomplete | Self::Unknown => 0,
695 Self::Completed(stats) | Self::Cancelled(stats) => decimal_char_width(stats.passed),
696 Self::StressCompleted(stats) | Self::StressCancelled(stats) => {
697 decimal_char_width(stats.success_count)
698 }
699 }
700 }
701}
702
703impl RecordedRunInfo {
704 pub fn check_replayability(&self, files: &dyn RunFilesExist) -> ReplayabilityStatus {
716 let mut blocking = Vec::new();
717 let mut is_incomplete = false;
718
719 if let Err(incompatibility) = self
721 .store_format_version
722 .check_readable_by(STORE_FORMAT_VERSION)
723 {
724 blocking.push(NonReplayableReason::StoreVersionIncompatible { incompatibility });
725 }
726
727 if !files.store_zip_exists() {
729 blocking.push(NonReplayableReason::MissingStoreZip);
730 }
731 if !files.run_log_exists() {
732 blocking.push(NonReplayableReason::MissingRunLog);
733 }
734
735 match &self.status {
737 RecordedRunStatus::Unknown => {
738 blocking.push(NonReplayableReason::StatusUnknown);
739 }
740 RecordedRunStatus::Incomplete => {
741 is_incomplete = true;
742 }
743 RecordedRunStatus::Completed(_)
744 | RecordedRunStatus::Cancelled(_)
745 | RecordedRunStatus::StressCompleted(_)
746 | RecordedRunStatus::StressCancelled(_) => {
747 }
749 }
750
751 if !blocking.is_empty() {
753 ReplayabilityStatus::NotReplayable(blocking)
754 } else if is_incomplete {
755 ReplayabilityStatus::Incomplete
756 } else {
757 ReplayabilityStatus::Replayable
758 }
759 }
760
761 pub fn display<'a>(
773 &'a self,
774 run_id_index: &'a RunIdIndex,
775 replayability: &'a ReplayabilityStatus,
776 alignment: RunListAlignment,
777 styles: &'a Styles,
778 redactor: &'a Redactor,
779 ) -> DisplayRecordedRunInfo<'a> {
780 DisplayRecordedRunInfo::new(
781 self,
782 run_id_index,
783 replayability,
784 alignment,
785 styles,
786 redactor,
787 )
788 }
789
790 pub fn display_detailed<'a>(
804 &'a self,
805 run_id_index: &'a RunIdIndex,
806 replayability: &'a ReplayabilityStatus,
807 now: DateTime<Utc>,
808 styles: &'a Styles,
809 theme_characters: &'a ThemeCharacters,
810 redactor: &'a Redactor,
811 ) -> DisplayRecordedRunInfoDetailed<'a> {
812 DisplayRecordedRunInfoDetailed::new(
813 self,
814 run_id_index,
815 replayability,
816 now,
817 styles,
818 theme_characters,
819 redactor,
820 )
821 }
822}
823
824struct ReadRunsJsonResult {
826 runs: Vec<RecordedRunInfo>,
827 last_pruned_at: Option<DateTime<Utc>>,
828 write_permission: RunsJsonWritePermission,
829}
830
831fn read_runs_json(runs_dir: &Utf8Path) -> Result<ReadRunsJsonResult, RunStoreError> {
834 let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
835 let file = match File::open(&runs_json_path) {
836 Ok(file) => file,
837 Err(error) => {
838 if error.kind() == io::ErrorKind::NotFound {
839 return Ok(ReadRunsJsonResult {
841 runs: Vec::new(),
842 last_pruned_at: None,
843 write_permission: RunsJsonWritePermission::Allowed,
844 });
845 } else {
846 return Err(RunStoreError::RunListRead {
847 path: runs_json_path,
848 error,
849 });
850 }
851 }
852 };
853
854 let decoder = zstd::stream::Decoder::new(file).map_err(|error| RunStoreError::RunListRead {
855 path: runs_json_path.clone(),
856 error,
857 })?;
858
859 let list: RecordedRunList =
860 serde_json::from_reader(decoder).map_err(|error| RunStoreError::RunListDeserialize {
861 path: runs_json_path,
862 error,
863 })?;
864 let write_permission = list.write_permission();
865 let data = list.into_data();
866 Ok(ReadRunsJsonResult {
867 runs: data.runs,
868 last_pruned_at: data.last_pruned_at,
869 write_permission,
870 })
871}
872
873fn write_runs_json(
875 runs_dir: &Utf8Path,
876 runs: &[RecordedRunInfo],
877 last_pruned_at: Option<DateTime<Utc>>,
878) -> Result<(), RunStoreError> {
879 let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
880 let list = RecordedRunList::from_data(runs, last_pruned_at);
881
882 atomicwrites::AtomicFile::new(&runs_json_path, atomicwrites::AllowOverwrite)
883 .write(|file| {
884 let mut encoder = zstd::stream::Encoder::new(file, 3)?;
886 serde_json::to_writer_pretty(&mut encoder, &list)?;
887 encoder.finish()?;
888 Ok(())
889 })
890 .map_err(|error| RunStoreError::RunListWrite {
891 path: runs_json_path,
892 error,
893 })?;
894
895 Ok(())
896}
897
898#[derive(Debug)]
903pub struct SharedLockedRunStore<'store> {
904 runs_dir: StoreRunsDir<'store>,
905 #[expect(dead_code, reason = "held for lock duration")]
906 locked_file: DebugIgnore<File>,
907 runs: Vec<RecordedRunInfo>,
908 write_permission: RunsJsonWritePermission,
909 run_id_index: RunIdIndex,
910}
911
912impl<'store> SharedLockedRunStore<'store> {
913 pub fn into_snapshot(self) -> RunStoreSnapshot {
916 RunStoreSnapshot {
917 runs_dir: self.runs_dir.as_path().to_owned(),
918 runs: self.runs,
919 write_permission: self.write_permission,
920 run_id_index: self.run_id_index,
921 }
922 }
923}
924
925#[derive(Debug)]
927pub struct RunStoreSnapshot {
928 runs_dir: Utf8PathBuf,
929 runs: Vec<RecordedRunInfo>,
930 write_permission: RunsJsonWritePermission,
931 run_id_index: RunIdIndex,
932}
933
934impl RunStoreSnapshot {
935 pub fn runs_dir(&self) -> StoreRunsDir<'_> {
937 StoreRunsDir(&self.runs_dir)
938 }
939
940 pub fn write_permission(&self) -> RunsJsonWritePermission {
944 self.write_permission
945 }
946
947 pub fn runs(&self) -> &[RecordedRunInfo] {
949 &self.runs
950 }
951
952 pub fn run_count(&self) -> usize {
954 self.runs.len()
955 }
956
957 pub fn total_size(&self) -> u64 {
959 self.runs.iter().map(|r| r.sizes.total_compressed()).sum()
960 }
961
962 pub fn resolve_run_id(
968 &self,
969 selector: &RunIdSelector,
970 ) -> Result<ResolveRunIdResult, RunIdResolutionError> {
971 match selector {
972 RunIdSelector::Latest => self.most_recent_run(),
973 RunIdSelector::Prefix(prefix) => {
974 let run_id = self.resolve_run_id_prefix(prefix)?;
975 Ok(ResolveRunIdResult { run_id })
976 }
977 }
978 }
979
980 fn resolve_run_id_prefix(&self, prefix: &str) -> Result<ReportUuid, RunIdResolutionError> {
986 self.run_id_index.resolve_prefix(prefix).map_err(|err| {
987 match err {
988 PrefixResolutionError::NotFound => RunIdResolutionError::NotFound {
989 prefix: prefix.to_string(),
990 },
991 PrefixResolutionError::Ambiguous { count, candidates } => {
992 let mut candidates: Vec<_> = candidates
994 .into_iter()
995 .filter_map(|run_id| self.get_run(run_id).cloned())
996 .collect();
997 candidates.sort_by_key(|run| std::cmp::Reverse(run.started_at));
998 RunIdResolutionError::Ambiguous {
999 prefix: prefix.to_string(),
1000 count,
1001 candidates,
1002 run_id_index: self.run_id_index.clone(),
1003 }
1004 }
1005 PrefixResolutionError::InvalidPrefix => RunIdResolutionError::InvalidPrefix {
1006 prefix: prefix.to_string(),
1007 },
1008 }
1009 })
1010 }
1011
1012 pub fn run_id_index(&self) -> &RunIdIndex {
1014 &self.run_id_index
1015 }
1016
1017 pub fn get_run(&self, run_id: ReportUuid) -> Option<&RecordedRunInfo> {
1019 self.runs.iter().find(|r| r.run_id == run_id)
1020 }
1021
1022 pub fn most_recent_run(&self) -> Result<ResolveRunIdResult, RunIdResolutionError> {
1026 self.runs
1027 .iter()
1028 .max_by_key(|r| r.started_at)
1029 .map(|r| ResolveRunIdResult { run_id: r.run_id })
1030 .ok_or(RunIdResolutionError::NoRuns)
1031 }
1032
1033 pub fn compute_prune_plan(&self, policy: &RecordRetentionPolicy) -> PrunePlan {
1039 PrunePlan::compute(&self.runs, policy)
1040 }
1041}
1042
1043#[derive(Debug)]
1049pub struct SnapshotWithReplayability<'a> {
1050 snapshot: &'a RunStoreSnapshot,
1051 replayability: HashMap<ReportUuid, ReplayabilityStatus>,
1052 latest_run_id: Option<ReportUuid>,
1053}
1054
1055impl<'a> SnapshotWithReplayability<'a> {
1056 pub fn new(snapshot: &'a RunStoreSnapshot) -> Self {
1061 let runs_dir = snapshot.runs_dir();
1062 let replayability: HashMap<_, _> = snapshot
1063 .runs()
1064 .iter()
1065 .map(|run| {
1066 (
1067 run.run_id,
1068 run.check_replayability(&runs_dir.run_files(run.run_id)),
1069 )
1070 })
1071 .collect();
1072
1073 let latest_run_id = snapshot.most_recent_run().ok().map(|r| r.run_id);
1075
1076 Self {
1077 snapshot,
1078 replayability,
1079 latest_run_id,
1080 }
1081 }
1082
1083 pub fn snapshot(&self) -> &'a RunStoreSnapshot {
1085 self.snapshot
1086 }
1087
1088 pub fn replayability(&self) -> &HashMap<ReportUuid, ReplayabilityStatus> {
1090 &self.replayability
1091 }
1092
1093 pub fn get_replayability(&self, run_id: ReportUuid) -> &ReplayabilityStatus {
1100 self.replayability
1101 .get(&run_id)
1102 .expect("run ID should be in replayability map")
1103 }
1104
1105 pub fn latest_run_id(&self) -> Option<ReportUuid> {
1107 self.latest_run_id
1108 }
1109}
1110
1111#[cfg(test)]
1112impl SnapshotWithReplayability<'_> {
1113 pub fn new_for_test(snapshot: &RunStoreSnapshot) -> SnapshotWithReplayability<'_> {
1117 let replayability: HashMap<_, _> = snapshot
1118 .runs()
1119 .iter()
1120 .map(|run| (run.run_id, ReplayabilityStatus::Replayable))
1121 .collect();
1122
1123 let latest_run_id = snapshot
1125 .runs()
1126 .iter()
1127 .max_by_key(|r| r.started_at)
1128 .map(|r| r.run_id);
1129
1130 SnapshotWithReplayability {
1131 snapshot,
1132 replayability,
1133 latest_run_id,
1134 }
1135 }
1136}
1137
1138#[cfg(test)]
1139impl RunStoreSnapshot {
1140 pub(crate) fn new_for_test(runs: Vec<RecordedRunInfo>) -> Self {
1142 use super::run_id_index::RunIdIndex;
1143
1144 let run_id_index = RunIdIndex::new(&runs);
1145 Self {
1146 runs_dir: Utf8PathBuf::from("/test/runs"),
1147 runs,
1148 write_permission: RunsJsonWritePermission::Allowed,
1149 run_id_index,
1150 }
1151 }
1152}
1153
1154#[derive(Clone, Copy)]
1156enum LockKind {
1157 Shared,
1158 Exclusive,
1159}
1160
1161fn acquire_lock_with_retry(
1166 file: &File,
1167 lock_file_path: &Utf8Path,
1168 kind: LockKind,
1169) -> Result<(), RunStoreError> {
1170 const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
1171 const LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(100);
1172
1173 let start = Instant::now();
1174 loop {
1175 let result = match kind {
1176 LockKind::Shared => file.try_lock_shared(),
1177 LockKind::Exclusive => file.try_lock(),
1178 };
1179
1180 match result {
1181 Ok(()) => return Ok(()),
1182 Err(TryLockError::WouldBlock) => {
1183 if start.elapsed() >= LOCK_TIMEOUT {
1185 return Err(RunStoreError::FileLockTimeout {
1186 path: lock_file_path.to_owned(),
1187 timeout_secs: LOCK_TIMEOUT.as_secs(),
1188 });
1189 }
1190 thread::sleep(LOCK_RETRY_INTERVAL);
1191 }
1192 Err(TryLockError::Error(error)) => {
1193 return Err(RunStoreError::FileLock {
1195 path: lock_file_path.to_owned(),
1196 error,
1197 });
1198 }
1199 }
1200 }
1201}