1use crate::app::{
13 CuDistributedReplayApplication, CuRecordedReplayApplication, CuSimApplication, Subsystem,
14};
15use crate::config::{MultiCopperConfig, read_configuration_str, read_multi_configuration};
16use crate::copperlist::CopperList;
17use crate::curuntime::{
18 KeyFrame, RuntimeLifecycleConfigSource, RuntimeLifecycleEvent, RuntimeLifecycleRecord,
19 RuntimeLifecycleStackInfo,
20};
21use crate::debug::{
22 SectionIndexEntry, build_read_logger, decode_copperlists, index_log, read_section_at,
23};
24use crate::simulation::recorded_copperlist_timestamp;
25use bincode::config::standard;
26use bincode::decode_from_std_read;
27use bincode::error::DecodeError;
28use cu29_clock::{RobotClock, RobotClockMock};
29use cu29_traits::{CopperListTuple, CuError, CuResult, ErasedCuStampedDataSet, UnifiedLogType};
30use cu29_unifiedlog::memmap::MmapSectionStorage;
31use cu29_unifiedlog::{
32 NoopLogger, NoopSectionStorage, SectionStorage, UnifiedLogWrite, UnifiedLogger,
33 UnifiedLoggerBuilder, UnifiedLoggerIOReader, UnifiedLoggerRead, UnifiedLoggerWrite,
34};
35use std::any::type_name;
36use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
37use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
38use std::fs;
39use std::io::Read;
40use std::path::{Path, PathBuf};
41use std::sync::{Arc, Mutex};
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct DistributedReplayLog {
46 pub base_path: PathBuf,
47 pub stack: RuntimeLifecycleStackInfo,
48 pub config_source: RuntimeLifecycleConfigSource,
49 pub effective_config_ron: String,
50 pub mission: Option<String>,
51}
52
53impl DistributedReplayLog {
54 pub fn discover(path: impl AsRef<Path>) -> CuResult<Self> {
57 let requested_path = path.as_ref();
58 let normalized_path = normalize_candidate_log_base(requested_path);
59 match Self::discover_from_base_path(requested_path) {
60 Ok(log) => Ok(log),
61 Err(_) if normalized_path != requested_path => {
62 Self::discover_from_base_path(&normalized_path)
63 }
64 Err(err) => Err(err),
65 }
66 }
67
68 fn discover_from_base_path(base_path: &Path) -> CuResult<Self> {
69 let UnifiedLogger::Read(read_logger) = UnifiedLoggerBuilder::new()
70 .file_base_name(base_path)
71 .build()
72 .map_err(|err| {
73 CuError::new_with_cause(
74 &format!(
75 "Failed to open Copper log '{}' for distributed replay discovery",
76 base_path.display()
77 ),
78 err,
79 )
80 })?
81 else {
82 return Err(CuError::from(
83 "Expected a readable unified logger during distributed replay discovery",
84 ));
85 };
86
87 let mut reader = UnifiedLoggerIOReader::new(read_logger, UnifiedLogType::RuntimeLifecycle);
88 let mut instantiated: Option<(
89 RuntimeLifecycleConfigSource,
90 String,
91 RuntimeLifecycleStackInfo,
92 )> = None;
93 let mut mission = None;
94
95 while let Some(record) =
96 read_next_entry::<RuntimeLifecycleRecord>(&mut reader).map_err(|err| {
97 CuError::from(format!(
98 "Failed to decode runtime lifecycle for '{}': {err}",
99 base_path.display()
100 ))
101 })?
102 {
103 match record.event {
104 RuntimeLifecycleEvent::Instantiated {
105 config_source,
106 effective_config_ron,
107 stack,
108 } if instantiated.is_none() => {
109 instantiated = Some((config_source, effective_config_ron, stack));
110 }
111 RuntimeLifecycleEvent::MissionStarted {
112 mission: started_mission,
113 } if mission.is_none() => {
114 mission = Some(started_mission);
115 }
116 _ => {}
117 }
118
119 if instantiated.is_some() && mission.is_some() {
120 break;
121 }
122 }
123
124 let Some((config_source, effective_config_ron, stack)) = instantiated else {
125 return Err(CuError::from(format!(
126 "Copper log '{}' has no RuntimeLifecycle::Instantiated record",
127 base_path.display()
128 )));
129 };
130
131 Ok(Self {
132 base_path: base_path.to_path_buf(),
133 stack,
134 config_source,
135 effective_config_ron,
136 mission,
137 })
138 }
139
140 #[inline]
141 pub fn instance_id(&self) -> u32 {
142 self.stack.instance_id
143 }
144
145 #[inline]
146 pub fn subsystem_code(&self) -> u16 {
147 self.stack.subsystem_code
148 }
149
150 #[inline]
151 pub fn subsystem_id(&self) -> Option<&str> {
152 self.stack.subsystem_id.as_deref()
153 }
154}
155
156#[derive(Debug, Clone)]
158pub struct DistributedReplayDiscoveryFailure {
159 pub candidate_path: PathBuf,
160 pub error: String,
161}
162
163impl Display for DistributedReplayDiscoveryFailure {
164 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
165 write!(
166 f,
167 "{}: {}",
168 self.candidate_path.display(),
169 self.error.as_str()
170 )
171 }
172}
173
174#[derive(Debug, Clone, Default)]
176pub struct DistributedReplayCatalog {
177 pub logs: Vec<DistributedReplayLog>,
178 pub failures: Vec<DistributedReplayDiscoveryFailure>,
179}
180
181impl DistributedReplayCatalog {
182 pub fn discover<I, P>(inputs: I) -> CuResult<Self>
187 where
188 I: IntoIterator<Item = P>,
189 P: AsRef<Path>,
190 {
191 let mut candidates = BTreeSet::new();
192 for input in inputs {
193 collect_candidate_base_paths(input.as_ref(), &mut candidates)?;
194 }
195
196 let mut logs = Vec::new();
197 let mut failures = Vec::new();
198
199 for candidate in candidates {
200 match DistributedReplayLog::discover(&candidate) {
201 Ok(log) => logs.push(log),
202 Err(err) => failures.push(DistributedReplayDiscoveryFailure {
203 candidate_path: candidate,
204 error: err.to_string(),
205 }),
206 }
207 }
208
209 logs.sort_by(|left, right| {
210 (
211 left.instance_id(),
212 left.subsystem_code(),
213 left.subsystem_id(),
214 left.base_path.as_os_str(),
215 )
216 .cmp(&(
217 right.instance_id(),
218 right.subsystem_code(),
219 right.subsystem_id(),
220 right.base_path.as_os_str(),
221 ))
222 });
223 failures.sort_by(|left, right| left.candidate_path.cmp(&right.candidate_path));
224
225 Ok(Self { logs, failures })
226 }
227
228 pub fn discover_under(root: impl AsRef<Path>) -> CuResult<Self> {
230 Self::discover([root])
231 }
232}
233
234type DistributedReplaySessionFactory = fn(
235 &DistributedReplayAssignment,
236 &DistributedReplaySessionConfig,
237) -> CuResult<DistributedReplaySessionBuild>;
238
239const DEFAULT_SECTION_CACHE_CAP: usize = 8;
240const DEFAULT_REPLAY_LOG_SIZE_BYTES: usize = 64 * 1024 * 1024;
241
242#[derive(Debug, Clone, Default)]
243struct DistributedReplaySessionConfig {
244 output_root: Option<PathBuf>,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
248struct DistributedReplayOriginKey {
249 instance_id: u32,
250 subsystem_code: u16,
251 cl_id: u64,
252}
253
254#[derive(Debug, Clone, PartialEq, Eq, Hash)]
255pub struct DistributedReplayCursor {
256 pub instance_id: u32,
257 pub subsystem_id: String,
258 pub cl_id: u64,
259 subsystem_code: u16,
260}
261
262impl DistributedReplayCursor {
263 #[inline]
264 fn new(instance_id: u32, subsystem_id: String, subsystem_code: u16, cl_id: u64) -> Self {
265 Self {
266 instance_id,
267 subsystem_id,
268 cl_id,
269 subsystem_code,
270 }
271 }
272
273 #[inline]
274 pub fn subsystem_code(&self) -> u16 {
275 self.subsystem_code
276 }
277}
278
279#[derive(Debug, Clone)]
280struct DistributedReplayNodeDescriptor {
281 cursor: DistributedReplayCursor,
282 origin_key: DistributedReplayOriginKey,
283 incoming_origins: BTreeSet<DistributedReplayOriginKey>,
284}
285
286#[derive(Debug, Clone)]
287struct DistributedReplayGraphNode {
288 cursor: DistributedReplayCursor,
289 session_index: usize,
290 outgoing: Vec<usize>,
291 initial_dependencies: usize,
292 remaining_dependencies: usize,
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
296struct DistributedReplayReadyNode {
297 instance_id: u32,
298 subsystem_code: u16,
299 cl_id: u64,
300 node_index: usize,
301}
302
303struct DistributedReplaySessionBuild {
304 session: Box<dyn DistributedReplaySession>,
305 nodes: Vec<DistributedReplayNodeDescriptor>,
306 output_log_path: Option<PathBuf>,
307}
308
309trait DistributedReplaySession {
310 fn goto_cl(&mut self, cl_id: u64) -> CuResult<()>;
311 fn shutdown(&mut self) -> CuResult<()>;
312}
313
314#[derive(Debug, Clone)]
315struct RecordedReplayCachedSection<P: CopperListTuple> {
316 entries: Vec<Arc<CopperList<P>>>,
317}
318
319struct RecordedReplaySession<App, P, S, L>
320where
321 App: CuDistributedReplayApplication<S, L>,
322 P: CopperListTuple,
323 S: SectionStorage,
324 L: UnifiedLogWrite<S> + 'static,
325{
326 assignment: DistributedReplayAssignment,
327 app: App,
328 clock_mock: RobotClockMock,
329 log_reader: UnifiedLoggerRead,
330 sections: Vec<SectionIndexEntry>,
331 total_entries: usize,
332 keyframes: Vec<KeyFrame>,
333 started: bool,
334 current_idx: Option<usize>,
335 last_keyframe: Option<u64>,
336 cache: HashMap<usize, RecordedReplayCachedSection<P>>,
337 cache_order: VecDeque<usize>,
338 cache_cap: usize,
339 phantom: std::marker::PhantomData<(S, L)>,
340}
341
342impl<App, P, S, L> RecordedReplaySession<App, P, S, L>
343where
344 App: CuDistributedReplayApplication<S, L>
345 + CuRecordedReplayApplication<S, L, RecordedDataSet = P>,
346 P: CopperListTuple + 'static,
347 S: SectionStorage,
348 L: UnifiedLogWrite<S> + 'static,
349{
350 fn from_log(
351 assignment: DistributedReplayAssignment,
352 app: App,
353 clock_mock: RobotClockMock,
354 log_base: &Path,
355 ) -> CuResult<Self> {
356 crate::logcodec::set_effective_config_ron::<P>(&assignment.log.effective_config_ron);
357 let (sections, keyframes, total_entries) =
358 index_log::<P, _>(log_base, &recorded_copperlist_timestamp::<P>)?;
359 let log_reader = build_read_logger(log_base)?;
360 Ok(Self {
361 assignment,
362 app,
363 clock_mock,
364 log_reader,
365 sections,
366 total_entries,
367 keyframes,
368 started: false,
369 current_idx: None,
370 last_keyframe: None,
371 cache: HashMap::new(),
372 cache_order: VecDeque::new(),
373 cache_cap: DEFAULT_SECTION_CACHE_CAP,
374 phantom: std::marker::PhantomData,
375 })
376 }
377
378 fn describe_nodes(&mut self) -> CuResult<Vec<DistributedReplayNodeDescriptor>> {
379 let mut nodes = Vec::with_capacity(self.total_entries);
380 for idx in 0..self.total_entries {
381 let (copperlist, _) = self.copperlist_at(idx)?;
382 let cursor = DistributedReplayCursor::new(
383 self.assignment.instance_id,
384 self.assignment.subsystem_id.clone(),
385 self.assignment.log.subsystem_code(),
386 copperlist.id,
387 );
388 nodes.push(DistributedReplayNodeDescriptor {
389 origin_key: DistributedReplayOriginKey {
390 instance_id: cursor.instance_id,
391 subsystem_code: cursor.subsystem_code(),
392 cl_id: cursor.cl_id,
393 },
394 incoming_origins: copperlist_origins(copperlist.as_ref()),
395 cursor,
396 });
397 }
398 Ok(nodes)
399 }
400
401 fn ensure_started(&mut self) -> CuResult<()> {
402 if self.started {
403 return Ok(());
404 }
405 let mut noop = |_step: App::Step<'_>| crate::simulation::SimOverride::ExecuteByRuntime;
406 <App as CuSimApplication<S, L>>::start_all_tasks(&mut self.app, &mut noop)?;
407 self.started = true;
408 Ok(())
409 }
410
411 fn nearest_keyframe(&self, target_cl_id: u64) -> Option<KeyFrame> {
412 self.keyframes
413 .iter()
414 .filter(|keyframe| keyframe.culistid <= target_cl_id)
415 .max_by_key(|keyframe| keyframe.culistid)
416 .cloned()
417 }
418
419 fn restore_keyframe(&mut self, keyframe: &KeyFrame) -> CuResult<()> {
420 <App as CuSimApplication<S, L>>::restore_keyframe(&mut self.app, keyframe)?;
421 self.clock_mock.set_value(keyframe.timestamp.as_nanos());
422 self.last_keyframe = Some(keyframe.culistid);
423 Ok(())
424 }
425
426 fn find_section_for_index(&self, idx: usize) -> Option<usize> {
427 self.sections
428 .binary_search_by(|section| {
429 if idx < section.start_idx {
430 std::cmp::Ordering::Greater
431 } else if idx >= section.start_idx + section.len {
432 std::cmp::Ordering::Less
433 } else {
434 std::cmp::Ordering::Equal
435 }
436 })
437 .ok()
438 }
439
440 fn find_section_for_cl_id(&self, cl_id: u64) -> Option<usize> {
441 self.sections
442 .binary_search_by(|section| {
443 if cl_id < section.first_id {
444 std::cmp::Ordering::Greater
445 } else if cl_id > section.last_id {
446 std::cmp::Ordering::Less
447 } else {
448 std::cmp::Ordering::Equal
449 }
450 })
451 .ok()
452 }
453
454 fn touch_cache(&mut self, key: usize) {
455 if let Some(position) = self.cache_order.iter().position(|entry| *entry == key) {
456 self.cache_order.remove(position);
457 }
458 self.cache_order.push_back(key);
459 while self.cache_order.len() > self.cache_cap {
460 if let Some(oldest) = self.cache_order.pop_front() {
461 self.cache.remove(&oldest);
462 }
463 }
464 }
465
466 fn load_section(&mut self, section_idx: usize) -> CuResult<&RecordedReplayCachedSection<P>> {
467 if self.cache.contains_key(§ion_idx) {
468 self.touch_cache(section_idx);
469 return Ok(self.cache.get(§ion_idx).expect("cache entry exists"));
470 }
471
472 let entry = &self.sections[section_idx];
473 let (header, data) = read_section_at(&mut self.log_reader, entry.pos)?;
474 if header.entry_type != UnifiedLogType::CopperList {
475 return Err(CuError::from(
476 "Section type mismatch while loading distributed replay copperlists",
477 ));
478 }
479 let (entries, _) = decode_copperlists::<P, _>(&data, &recorded_copperlist_timestamp::<P>)?;
480 self.cache
481 .insert(section_idx, RecordedReplayCachedSection { entries });
482 self.touch_cache(section_idx);
483 Ok(self.cache.get(§ion_idx).expect("cache entry exists"))
484 }
485
486 fn copperlist_at(&mut self, idx: usize) -> CuResult<(Arc<CopperList<P>>, Option<KeyFrame>)> {
487 let section_idx = self
488 .find_section_for_index(idx)
489 .ok_or_else(|| CuError::from("Distributed replay index is outside the log"))?;
490 let start_idx = self.sections[section_idx].start_idx;
491 let section = self.load_section(section_idx)?;
492 let local_idx = idx - start_idx;
493 let copperlist = section
494 .entries
495 .get(local_idx)
496 .ok_or_else(|| CuError::from("Corrupt distributed replay section index"))?
497 .clone();
498 let keyframe = self
499 .keyframes
500 .iter()
501 .find(|keyframe| keyframe.culistid == copperlist.id)
502 .cloned();
503 Ok((copperlist, keyframe))
504 }
505
506 fn index_for_cl_id(&mut self, cl_id: u64) -> CuResult<usize> {
507 let section_idx = self
508 .find_section_for_cl_id(cl_id)
509 .ok_or_else(|| CuError::from("Requested CopperList id is not present in the log"))?;
510 let start_idx = self.sections[section_idx].start_idx;
511 let section = self.load_section(section_idx)?;
512 for (offset, copperlist) in section.entries.iter().enumerate() {
513 if copperlist.id == cl_id {
514 return Ok(start_idx + offset);
515 }
516 }
517 Err(CuError::from(
518 "Requested CopperList id is missing from its indexed log section",
519 ))
520 }
521
522 fn replay_range(
523 &mut self,
524 start_idx: usize,
525 end_idx: usize,
526 replay_keyframe: Option<&KeyFrame>,
527 ) -> CuResult<()> {
528 for idx in start_idx..=end_idx {
529 let (copperlist, keyframe) = self.copperlist_at(idx)?;
530 let keyframe = replay_keyframe
531 .filter(|candidate| candidate.culistid == copperlist.id)
532 .or(keyframe
533 .as_ref()
534 .filter(|candidate| candidate.culistid == copperlist.id));
535 <App as CuRecordedReplayApplication<S, L>>::replay_recorded_copperlist(
536 &mut self.app,
537 &self.clock_mock,
538 copperlist.as_ref(),
539 keyframe,
540 )?;
541 self.current_idx = Some(idx);
542 }
543 Ok(())
544 }
545
546 fn goto_index(&mut self, target_idx: usize) -> CuResult<()> {
547 self.ensure_started()?;
548 if target_idx >= self.total_entries {
549 return Err(CuError::from(
550 "Distributed replay target is outside the log",
551 ));
552 }
553
554 let (target_copperlist, _) = self.copperlist_at(target_idx)?;
555 let target_cl_id = target_copperlist.id;
556
557 let replay_start_idx;
558 let replay_keyframe;
559
560 if let Some(current_idx) = self.current_idx {
561 if current_idx == target_idx {
562 return Ok(());
563 }
564
565 if target_idx > current_idx {
566 replay_start_idx = current_idx + 1;
567 replay_keyframe = None;
568 } else {
569 let keyframe = self.nearest_keyframe(target_cl_id).ok_or_else(|| {
570 CuError::from("No keyframe is available to rewind distributed replay")
571 })?;
572 self.restore_keyframe(&keyframe)?;
573 replay_start_idx = self.index_for_cl_id(keyframe.culistid)?;
574 replay_keyframe = Some(keyframe);
575 }
576 } else {
577 let keyframe = self.nearest_keyframe(target_cl_id).ok_or_else(|| {
578 CuError::from("No keyframe is available to initialize distributed replay")
579 })?;
580 self.restore_keyframe(&keyframe)?;
581 replay_start_idx = self.index_for_cl_id(keyframe.culistid)?;
582 replay_keyframe = Some(keyframe);
583 }
584
585 self.replay_range(replay_start_idx, target_idx, replay_keyframe.as_ref())
586 }
587}
588
589impl<App, P, S, L> DistributedReplaySession for RecordedReplaySession<App, P, S, L>
590where
591 App: CuDistributedReplayApplication<S, L>
592 + CuRecordedReplayApplication<S, L, RecordedDataSet = P>,
593 P: CopperListTuple + 'static,
594 S: SectionStorage,
595 L: UnifiedLogWrite<S> + 'static,
596{
597 fn goto_cl(&mut self, cl_id: u64) -> CuResult<()> {
598 let target_idx = self.index_for_cl_id(cl_id)?;
599 self.goto_index(target_idx)
600 }
601
602 fn shutdown(&mut self) -> CuResult<()> {
603 if !self.started {
604 return Ok(());
605 }
606
607 let mut noop = |_step: App::Step<'_>| crate::simulation::SimOverride::ExecuteByRuntime;
608 <App as CuSimApplication<S, L>>::stop_all_tasks(&mut self.app, &mut noop)?;
609 self.started = false;
610 Ok(())
611 }
612}
613
614#[derive(Clone)]
616pub struct DistributedReplayAppRegistration {
617 pub subsystem: Subsystem,
618 pub app_type_name: &'static str,
619 session_factory: DistributedReplaySessionFactory,
620}
621
622impl Debug for DistributedReplayAppRegistration {
623 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
624 f.debug_struct("DistributedReplayAppRegistration")
625 .field("subsystem", &self.subsystem)
626 .field("app_type_name", &self.app_type_name)
627 .finish()
628 }
629}
630
631impl PartialEq for DistributedReplayAppRegistration {
632 fn eq(&self, other: &Self) -> bool {
633 self.subsystem == other.subsystem && self.app_type_name == other.app_type_name
634 }
635}
636
637impl Eq for DistributedReplayAppRegistration {}
638
639#[derive(Debug, Clone, PartialEq, Eq)]
641pub struct DistributedReplayAssignment {
642 pub instance_id: u32,
643 pub subsystem_id: String,
644 pub log: DistributedReplayLog,
645 pub registration: DistributedReplayAppRegistration,
646}
647
648#[derive(Debug, Clone)]
650pub struct DistributedReplayPlan {
651 pub multi_config_path: PathBuf,
652 pub multi_config: MultiCopperConfig,
653 pub catalog: DistributedReplayCatalog,
654 pub selected_instances: Vec<u32>,
655 pub mission: Option<String>,
656 pub registrations: Vec<DistributedReplayAppRegistration>,
657 pub assignments: Vec<DistributedReplayAssignment>,
658}
659
660impl DistributedReplayPlan {
661 #[inline]
662 pub fn builder(multi_config_path: impl AsRef<Path>) -> CuResult<DistributedReplayBuilder> {
663 DistributedReplayBuilder::new(multi_config_path)
664 }
665
666 #[inline]
667 pub fn assignment(
668 &self,
669 instance_id: u32,
670 subsystem_id: &str,
671 ) -> Option<&DistributedReplayAssignment> {
672 self.assignments.iter().find(|assignment| {
673 assignment.instance_id == instance_id && assignment.subsystem_id == subsystem_id
674 })
675 }
676
677 pub fn start(self) -> CuResult<DistributedReplayEngine> {
679 DistributedReplayEngine::new(self, DistributedReplaySessionConfig::default())
680 }
681
682 pub fn start_recording_logs_under(
684 self,
685 output_root: impl AsRef<Path>,
686 ) -> CuResult<DistributedReplayEngine> {
687 DistributedReplayEngine::new(
688 self,
689 DistributedReplaySessionConfig {
690 output_root: Some(output_root.as_ref().to_path_buf()),
691 },
692 )
693 }
694}
695
696#[derive(Debug, Clone, Default)]
698pub struct DistributedReplayValidationError {
699 pub issues: Vec<String>,
700}
701
702impl DistributedReplayValidationError {
703 fn push(&mut self, issue: impl Into<String>) {
704 self.issues.push(issue.into());
705 }
706
707 fn is_empty(&self) -> bool {
708 self.issues.is_empty()
709 }
710}
711
712impl Display for DistributedReplayValidationError {
713 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
714 writeln!(f, "Distributed replay validation failed:")?;
715 for issue in &self.issues {
716 writeln!(f, " - {issue}")?;
717 }
718 Ok(())
719 }
720}
721
722#[derive(Debug, Clone)]
724pub struct DistributedReplayBuilder {
725 multi_config_path: PathBuf,
726 multi_config: MultiCopperConfig,
727 discovery_inputs: Vec<PathBuf>,
728 catalog: Option<DistributedReplayCatalog>,
729 registrations: BTreeMap<String, DistributedReplayAppRegistration>,
730 selected_instances: Option<BTreeSet<u32>>,
731}
732
733impl DistributedReplayBuilder {
734 pub fn new(multi_config_path: impl AsRef<Path>) -> CuResult<Self> {
736 let multi_config_path = multi_config_path.as_ref().to_path_buf();
737 let multi_config = read_multi_configuration(&multi_config_path.to_string_lossy())?;
738 Ok(Self {
739 multi_config_path,
740 multi_config,
741 discovery_inputs: Vec::new(),
742 catalog: None,
743 registrations: BTreeMap::new(),
744 selected_instances: None,
745 })
746 }
747
748 pub fn with_catalog(mut self, catalog: DistributedReplayCatalog) -> Self {
750 self.catalog = Some(catalog);
751 self
752 }
753
754 pub fn discover_logs<I, P>(mut self, inputs: I) -> CuResult<Self>
758 where
759 I: IntoIterator<Item = P>,
760 P: AsRef<Path>,
761 {
762 self.discovery_inputs
763 .extend(inputs.into_iter().map(|path| path.as_ref().to_path_buf()));
764 self.catalog = Some(DistributedReplayCatalog::discover(
765 self.discovery_inputs.iter().collect::<Vec<_>>(),
766 )?);
767 Ok(self)
768 }
769
770 pub fn discover_logs_under(self, root: impl AsRef<Path>) -> CuResult<Self> {
772 self.discover_logs([root.as_ref().to_path_buf()])
773 }
774
775 pub fn instances<I>(mut self, instances: I) -> Self
777 where
778 I: IntoIterator<Item = u32>,
779 {
780 self.selected_instances = Some(instances.into_iter().collect());
781 self
782 }
783
784 pub fn register<App>(mut self, subsystem_id: &str) -> CuResult<Self>
786 where
787 App: CuDistributedReplayApplication<NoopSectionStorage, NoopLogger>
788 + CuDistributedReplayApplication<MmapSectionStorage, UnifiedLoggerWrite>
789 + 'static,
790 {
791 if self.registrations.contains_key(subsystem_id) {
792 return Err(CuError::from(format!(
793 "Subsystem '{}' is already registered for distributed replay",
794 subsystem_id
795 )));
796 }
797
798 let expected_subsystem = self.multi_config.subsystem(subsystem_id).ok_or_else(|| {
799 CuError::from(format!(
800 "Multi-Copper config '{}' does not define subsystem '{}'",
801 self.multi_config_path.display(),
802 subsystem_id
803 ))
804 })?;
805
806 let registered_subsystem = App::subsystem();
807 let Some(registered_subsystem_id) = registered_subsystem.id() else {
808 return Err(CuError::from(format!(
809 "App type '{}' was not generated for a multi-Copper subsystem and cannot be registered for distributed replay",
810 type_name::<App>()
811 )));
812 };
813
814 if registered_subsystem_id != subsystem_id {
815 return Err(CuError::from(format!(
816 "App type '{}' declares subsystem '{}' but was registered as '{}'",
817 type_name::<App>(),
818 registered_subsystem_id,
819 subsystem_id
820 )));
821 }
822
823 let registered_subsystem_code = registered_subsystem.code();
824 if registered_subsystem_code != expected_subsystem.subsystem_code {
825 return Err(CuError::from(format!(
826 "App type '{}' declares subsystem code {} for '{}' but multi-Copper config '{}' expects {}",
827 type_name::<App>(),
828 registered_subsystem_code,
829 subsystem_id,
830 self.multi_config_path.display(),
831 expected_subsystem.subsystem_code
832 )));
833 }
834
835 self.registrations.insert(
836 subsystem_id.to_string(),
837 DistributedReplayAppRegistration {
838 subsystem: registered_subsystem,
839 app_type_name: type_name::<App>(),
840 session_factory: build_distributed_replay_session::<App>,
841 },
842 );
843 Ok(self)
844 }
845
846 pub fn build(self) -> CuResult<DistributedReplayPlan> {
848 let catalog = match self.catalog {
849 Some(catalog) => catalog,
850 None if self.discovery_inputs.is_empty() => DistributedReplayCatalog::default(),
851 None => DistributedReplayCatalog::discover(
852 self.discovery_inputs.iter().collect::<Vec<_>>(),
853 )?,
854 };
855
856 let mut validation = DistributedReplayValidationError::default();
857
858 for failure in &catalog.failures {
859 validation.push(format!(
860 "discovery failure for '{}': {}",
861 failure.candidate_path.display(),
862 failure.error
863 ));
864 }
865
866 let subsystem_map: BTreeMap<_, _> = self
867 .multi_config
868 .subsystems
869 .iter()
870 .map(|subsystem| (subsystem.id.clone(), subsystem))
871 .collect();
872
873 for subsystem in subsystem_map.keys() {
874 if !self.registrations.contains_key(subsystem) {
875 validation.push(format!(
876 "missing app registration for subsystem '{}'",
877 subsystem
878 ));
879 }
880 }
881
882 let mut discovered_instances = BTreeSet::new();
883 let mut logs_by_target: BTreeMap<(u32, String), Vec<DistributedReplayLog>> =
884 BTreeMap::new();
885
886 for log in &catalog.logs {
887 let Some(subsystem_id) = log.subsystem_id() else {
888 validation.push(format!(
889 "discovered log '{}' is missing subsystem_id runtime metadata",
890 log.base_path.display()
891 ));
892 continue;
893 };
894
895 let Some(expected_subsystem) = subsystem_map.get(subsystem_id) else {
896 validation.push(format!(
897 "discovered log '{}' belongs to subsystem '{}' which is not present in multi-Copper config '{}'",
898 log.base_path.display(),
899 subsystem_id,
900 self.multi_config_path.display()
901 ));
902 continue;
903 };
904
905 if log.subsystem_code() != expected_subsystem.subsystem_code {
906 validation.push(format!(
907 "discovered log '{}' reports subsystem code {} for '{}' but multi-Copper config '{}' expects {}",
908 log.base_path.display(),
909 log.subsystem_code(),
910 subsystem_id,
911 self.multi_config_path.display(),
912 expected_subsystem.subsystem_code
913 ));
914 }
915
916 discovered_instances.insert(log.instance_id());
917 logs_by_target
918 .entry((log.instance_id(), subsystem_id.to_string()))
919 .or_default()
920 .push(log.clone());
921 }
922
923 for ((instance_id, subsystem_id), logs) in &logs_by_target {
924 if logs.len() > 1 {
925 validation.push(format!(
926 "found {} logs for instance {} subsystem '{}': {}",
927 logs.len(),
928 instance_id,
929 subsystem_id,
930 join_log_paths(logs)
931 ));
932 }
933 }
934
935 let selected_instances: Vec<u32> =
936 if let Some(selected_instances) = &self.selected_instances {
937 let mut selected_instances: Vec<_> = selected_instances.iter().copied().collect();
938 selected_instances.sort_unstable();
939 for instance_id in &selected_instances {
940 if !discovered_instances.contains(instance_id) {
941 validation.push(format!(
942 "selected instance {} has no discovered logs",
943 instance_id
944 ));
945 }
946 }
947 selected_instances
948 } else {
949 discovered_instances.iter().copied().collect()
950 };
951
952 if selected_instances.is_empty() {
953 validation.push("no instances selected for distributed replay");
954 }
955
956 for instance_id in &selected_instances {
957 for subsystem in &self.multi_config.subsystems {
958 if !logs_by_target.contains_key(&(*instance_id, subsystem.id.clone())) {
959 validation.push(format!(
960 "missing log for instance {} subsystem '{}'",
961 instance_id, subsystem.id
962 ));
963 }
964 }
965 }
966
967 let mut known_missions = BTreeSet::new();
968 for instance_id in &selected_instances {
969 for subsystem in &self.multi_config.subsystems {
970 if let Some(logs) = logs_by_target.get(&(*instance_id, subsystem.id.clone()))
971 && let Some(log) = logs.first()
972 && let Some(mission) = &log.mission
973 {
974 known_missions.insert(mission.clone());
975 }
976 }
977 }
978 if known_missions.len() > 1 {
979 validation.push(format!(
980 "selected logs disagree on mission: {}",
981 known_missions.into_iter().collect::<Vec<_>>().join(", ")
982 ));
983 }
984
985 if !validation.is_empty() {
986 return Err(CuError::from(validation.to_string()));
987 }
988
989 let mission = selected_instances
990 .iter()
991 .flat_map(|instance_id| {
992 self.multi_config.subsystems.iter().filter_map(|subsystem| {
993 logs_by_target
994 .get(&(*instance_id, subsystem.id.clone()))
995 .and_then(|logs| logs.first())
996 .and_then(|log| log.mission.clone())
997 })
998 })
999 .next();
1000
1001 let mut registrations: Vec<_> = self.registrations.into_values().collect();
1002 registrations.sort_by(|left, right| left.subsystem.id().cmp(&right.subsystem.id()));
1003
1004 let mut assignments = Vec::new();
1005 for instance_id in &selected_instances {
1006 for subsystem in &self.multi_config.subsystems {
1007 let log = logs_by_target
1008 .get(&(*instance_id, subsystem.id.clone()))
1009 .and_then(|logs| logs.first())
1010 .expect("validated distributed replay plan is missing a log")
1011 .clone();
1012 let registration = registrations
1013 .iter()
1014 .find(|registration| registration.subsystem.id() == Some(subsystem.id.as_str()))
1015 .expect("validated distributed replay plan is missing a registration")
1016 .clone();
1017 assignments.push(DistributedReplayAssignment {
1018 instance_id: *instance_id,
1019 subsystem_id: subsystem.id.clone(),
1020 log,
1021 registration,
1022 });
1023 }
1024 }
1025 assignments.sort_by(|left, right| {
1026 (
1027 left.instance_id,
1028 left.registration.subsystem.code(),
1029 left.subsystem_id.as_str(),
1030 )
1031 .cmp(&(
1032 right.instance_id,
1033 right.registration.subsystem.code(),
1034 right.subsystem_id.as_str(),
1035 ))
1036 });
1037
1038 Ok(DistributedReplayPlan {
1039 multi_config_path: self.multi_config_path,
1040 multi_config: self.multi_config,
1041 catalog,
1042 selected_instances,
1043 mission,
1044 registrations,
1045 assignments,
1046 })
1047 }
1048}
1049
1050fn build_distributed_replay_session<App>(
1051 assignment: &DistributedReplayAssignment,
1052 session_config: &DistributedReplaySessionConfig,
1053) -> CuResult<DistributedReplaySessionBuild>
1054where
1055 App: CuDistributedReplayApplication<NoopSectionStorage, NoopLogger>
1056 + CuDistributedReplayApplication<MmapSectionStorage, UnifiedLoggerWrite>
1057 + 'static,
1058{
1059 let config = read_configuration_str(assignment.log.effective_config_ron.clone(), None)
1060 .map_err(|err| {
1061 CuError::from(format!(
1062 "Failed to parse recorded effective config from '{}': {err}",
1063 assignment.log.base_path.display()
1064 ))
1065 })?;
1066 let (clock, clock_mock) = RobotClock::mock();
1067
1068 if let Some(output_root) = &session_config.output_root {
1069 let output_log_path = replay_output_log_path(output_root, assignment)?;
1070 let logger = build_replay_output_logger(
1071 &output_log_path,
1072 replay_output_log_size_bytes(assignment, &config),
1073 )?;
1074 let app = <App as CuDistributedReplayApplication<
1075 MmapSectionStorage,
1076 UnifiedLoggerWrite,
1077 >>::build_distributed_replay(
1078 clock.clone(), logger, assignment.instance_id, Some(config)
1079 )?;
1080 let mut session = RecordedReplaySession::<
1081 App,
1082 <App as CuRecordedReplayApplication<
1083 MmapSectionStorage,
1084 UnifiedLoggerWrite,
1085 >>::RecordedDataSet,
1086 MmapSectionStorage,
1087 UnifiedLoggerWrite,
1088 >::from_log(assignment.clone(), app, clock_mock, &assignment.log.base_path)?;
1089 let nodes = session.describe_nodes()?;
1090 return Ok(DistributedReplaySessionBuild {
1091 session: Box::new(session),
1092 nodes,
1093 output_log_path: Some(output_log_path),
1094 });
1095 }
1096
1097 let logger = Arc::new(Mutex::new(NoopLogger::new()));
1098 let app = <App as CuDistributedReplayApplication<NoopSectionStorage, NoopLogger>>::build_distributed_replay(
1099 clock,
1100 logger,
1101 assignment.instance_id,
1102 Some(config),
1103 )?;
1104 let mut session = RecordedReplaySession::<
1105 App,
1106 <App as CuRecordedReplayApplication<NoopSectionStorage, NoopLogger>>::RecordedDataSet,
1107 NoopSectionStorage,
1108 NoopLogger,
1109 >::from_log(
1110 assignment.clone(),
1111 app,
1112 clock_mock,
1113 &assignment.log.base_path,
1114 )?;
1115 let nodes = session.describe_nodes()?;
1116 Ok(DistributedReplaySessionBuild {
1117 session: Box::new(session),
1118 nodes,
1119 output_log_path: None,
1120 })
1121}
1122
1123fn replay_output_log_path(
1124 output_root: &Path,
1125 assignment: &DistributedReplayAssignment,
1126) -> CuResult<PathBuf> {
1127 let file_name = assignment
1128 .log
1129 .base_path
1130 .file_name()
1131 .ok_or_else(|| {
1132 CuError::from(format!(
1133 "Replay assignment log '{}' has no file name",
1134 assignment.log.base_path.display()
1135 ))
1136 })?
1137 .to_owned();
1138 Ok(output_root.join(file_name))
1139}
1140
1141fn build_replay_output_logger(
1142 path: &Path,
1143 preallocated_size: usize,
1144) -> CuResult<Arc<Mutex<UnifiedLoggerWrite>>> {
1145 if let Some(parent) = path.parent() {
1146 fs::create_dir_all(parent).map_err(|err| {
1147 CuError::new_with_cause(
1148 &format!(
1149 "Failed to create replay log directory '{}'",
1150 parent.display()
1151 ),
1152 err,
1153 )
1154 })?;
1155 }
1156 let UnifiedLogger::Write(writer) = UnifiedLoggerBuilder::new()
1157 .write(true)
1158 .create(true)
1159 .file_base_name(path)
1160 .preallocated_size(preallocated_size)
1161 .build()
1162 .map_err(|err| {
1163 CuError::new_with_cause(
1164 &format!("Failed to create replay log '{}'", path.display()),
1165 err,
1166 )
1167 })?
1168 else {
1169 return Err(CuError::from(format!(
1170 "Expected writable replay logger for '{}'",
1171 path.display()
1172 )));
1173 };
1174 Ok(Arc::new(Mutex::new(writer)))
1175}
1176
1177fn replay_output_log_size_bytes(
1178 assignment: &DistributedReplayAssignment,
1179 config: &crate::config::CuConfig,
1180) -> usize {
1181 if let Some(slab_zero) = slab_zero_path(&assignment.log.base_path)
1182 && let Ok(metadata) = fs::metadata(slab_zero)
1183 && let Ok(size) = usize::try_from(metadata.len())
1184 {
1185 return size.max(DEFAULT_REPLAY_LOG_SIZE_BYTES);
1186 }
1187
1188 config
1189 .logging
1190 .as_ref()
1191 .and_then(|logging| logging.slab_size_mib)
1192 .and_then(|size_mib| usize::try_from(size_mib).ok())
1193 .and_then(|size_mib| size_mib.checked_mul(1024 * 1024))
1194 .unwrap_or(DEFAULT_REPLAY_LOG_SIZE_BYTES)
1195}
1196
1197fn copperlist_origins<P: CopperListTuple>(
1198 copperlist: &CopperList<P>,
1199) -> BTreeSet<DistributedReplayOriginKey> {
1200 <CopperList<P> as ErasedCuStampedDataSet>::cumsgs(copperlist)
1201 .into_iter()
1202 .filter_map(|msg| msg.metadata().origin())
1203 .map(|origin| DistributedReplayOriginKey {
1204 instance_id: origin.instance_id,
1205 subsystem_code: origin.subsystem_code,
1206 cl_id: origin.cl_id,
1207 })
1208 .collect()
1209}
1210
1211#[derive(Default)]
1212struct DistributedReplayEngineState {
1213 sessions: Vec<Box<dyn DistributedReplaySession>>,
1214 nodes: Vec<DistributedReplayGraphNode>,
1215 node_lookup: BTreeMap<(u32, String, u64), usize>,
1216 output_log_paths: BTreeMap<(u32, String), PathBuf>,
1217 ready: BTreeSet<DistributedReplayReadyNode>,
1218 frontier: Vec<Option<DistributedReplayCursor>>,
1219}
1220
1221pub struct DistributedReplayEngine {
1223 plan: DistributedReplayPlan,
1224 session_config: DistributedReplaySessionConfig,
1225 sessions: Vec<Box<dyn DistributedReplaySession>>,
1226 nodes: Vec<DistributedReplayGraphNode>,
1227 node_lookup: BTreeMap<(u32, String, u64), usize>,
1228 output_log_paths: BTreeMap<(u32, String), PathBuf>,
1229 ready: BTreeSet<DistributedReplayReadyNode>,
1230 frontier: Vec<Option<DistributedReplayCursor>>,
1231 executed: Vec<bool>,
1232 executed_count: usize,
1233}
1234
1235impl DistributedReplayEngine {
1236 fn new(
1237 plan: DistributedReplayPlan,
1238 session_config: DistributedReplaySessionConfig,
1239 ) -> CuResult<Self> {
1240 let state = Self::build_state(&plan, &session_config)?;
1241 let executed = vec![false; state.nodes.len()];
1242 Ok(Self {
1243 plan,
1244 session_config,
1245 sessions: state.sessions,
1246 nodes: state.nodes,
1247 node_lookup: state.node_lookup,
1248 output_log_paths: state.output_log_paths,
1249 ready: state.ready,
1250 frontier: state.frontier,
1251 executed,
1252 executed_count: 0,
1253 })
1254 }
1255
1256 fn build_state(
1257 plan: &DistributedReplayPlan,
1258 session_config: &DistributedReplaySessionConfig,
1259 ) -> CuResult<DistributedReplayEngineState> {
1260 let mut sessions = Vec::with_capacity(plan.assignments.len());
1261 let mut pending_nodes = Vec::new();
1262 let mut session_nodes = Vec::with_capacity(plan.assignments.len());
1263 let mut output_log_paths = BTreeMap::new();
1264
1265 for assignment in &plan.assignments {
1266 let build = (assignment.registration.session_factory)(assignment, session_config)?;
1267 let session_index = sessions.len();
1268 let mut node_indices = Vec::with_capacity(build.nodes.len());
1269 for node in build.nodes {
1270 let pending_index = pending_nodes.len();
1271 pending_nodes.push((session_index, node));
1272 node_indices.push(pending_index);
1273 }
1274 if let Some(output_log_path) = build.output_log_path {
1275 let replaced = output_log_paths.insert(
1276 (assignment.instance_id, assignment.subsystem_id.clone()),
1277 output_log_path,
1278 );
1279 if replaced.is_some() {
1280 return Err(CuError::from(format!(
1281 "Duplicate replay output log assignment for instance {} subsystem '{}'",
1282 assignment.instance_id, assignment.subsystem_id
1283 )));
1284 }
1285 }
1286 sessions.push(build.session);
1287 session_nodes.push(node_indices);
1288 }
1289
1290 let mut nodes = Vec::with_capacity(pending_nodes.len());
1291 let mut origin_lookup = BTreeMap::new();
1292 let mut node_lookup = BTreeMap::new();
1293
1294 for (node_index, (session_index, descriptor)) in pending_nodes.iter().enumerate() {
1295 if origin_lookup
1296 .insert(descriptor.origin_key.clone(), node_index)
1297 .is_some()
1298 {
1299 return Err(CuError::from(format!(
1300 "Duplicate replay node detected for instance {} subsystem code {} CopperList {}",
1301 descriptor.origin_key.instance_id,
1302 descriptor.origin_key.subsystem_code,
1303 descriptor.origin_key.cl_id
1304 )));
1305 }
1306
1307 if node_lookup
1308 .insert(
1309 (
1310 descriptor.cursor.instance_id,
1311 descriptor.cursor.subsystem_id.clone(),
1312 descriptor.cursor.cl_id,
1313 ),
1314 node_index,
1315 )
1316 .is_some()
1317 {
1318 return Err(CuError::from(format!(
1319 "Duplicate replay cursor detected for instance {} subsystem '{}' CopperList {}",
1320 descriptor.cursor.instance_id,
1321 descriptor.cursor.subsystem_id,
1322 descriptor.cursor.cl_id
1323 )));
1324 }
1325
1326 nodes.push(DistributedReplayGraphNode {
1327 cursor: descriptor.cursor.clone(),
1328 session_index: *session_index,
1329 outgoing: Vec::new(),
1330 initial_dependencies: 0,
1331 remaining_dependencies: 0,
1332 });
1333 }
1334
1335 let mut edges = BTreeSet::new();
1336
1337 for node_indices in &session_nodes {
1338 for pair in node_indices.windows(2) {
1339 let from = pair[0];
1340 let to = pair[1];
1341 if edges.insert((from, to)) {
1342 nodes[from].outgoing.push(to);
1343 nodes[to].initial_dependencies += 1;
1344 }
1345 }
1346 }
1347
1348 for (target_index, (_, descriptor)) in pending_nodes.iter().enumerate() {
1349 for origin in &descriptor.incoming_origins {
1350 let source_index = origin_lookup.get(origin).copied().ok_or_else(|| {
1351 CuError::from(format!(
1352 "Unresolved recorded provenance edge into instance {} subsystem '{}' CopperList {} from instance {} subsystem code {} CopperList {}",
1353 descriptor.cursor.instance_id,
1354 descriptor.cursor.subsystem_id,
1355 descriptor.cursor.cl_id,
1356 origin.instance_id,
1357 origin.subsystem_code,
1358 origin.cl_id
1359 ))
1360 })?;
1361 if source_index == target_index {
1362 return Err(CuError::from(format!(
1363 "Recorded provenance on instance {} subsystem '{}' CopperList {} points to itself",
1364 descriptor.cursor.instance_id,
1365 descriptor.cursor.subsystem_id,
1366 descriptor.cursor.cl_id
1367 )));
1368 }
1369 if edges.insert((source_index, target_index)) {
1370 nodes[source_index].outgoing.push(target_index);
1371 nodes[target_index].initial_dependencies += 1;
1372 }
1373 }
1374 }
1375
1376 let mut ready = BTreeSet::new();
1377 for (node_index, node) in nodes.iter_mut().enumerate() {
1378 node.remaining_dependencies = node.initial_dependencies;
1379 if node.remaining_dependencies == 0 {
1380 ready.insert(DistributedReplayReadyNode {
1381 instance_id: node.cursor.instance_id,
1382 subsystem_code: node.cursor.subsystem_code(),
1383 cl_id: node.cursor.cl_id,
1384 node_index,
1385 });
1386 }
1387 }
1388
1389 if !nodes.is_empty() && ready.is_empty() {
1390 return Err(CuError::from(
1391 "Distributed replay graph has no causally ready starting point",
1392 ));
1393 }
1394
1395 Ok(DistributedReplayEngineState {
1396 frontier: vec![None; sessions.len()],
1397 sessions,
1398 nodes,
1399 node_lookup,
1400 output_log_paths,
1401 ready,
1402 })
1403 }
1404
1405 fn shutdown_sessions(sessions: &mut Vec<Box<dyn DistributedReplaySession>>) -> CuResult<()> {
1406 for session in sessions.iter_mut() {
1407 session.shutdown()?;
1408 }
1409 Ok(())
1410 }
1411
1412 fn ready_key(&self, node_index: usize) -> DistributedReplayReadyNode {
1413 let node = &self.nodes[node_index];
1414 DistributedReplayReadyNode {
1415 instance_id: node.cursor.instance_id,
1416 subsystem_code: node.cursor.subsystem_code(),
1417 cl_id: node.cursor.cl_id,
1418 node_index,
1419 }
1420 }
1421
1422 pub fn reset(&mut self) -> CuResult<()> {
1424 Self::shutdown_sessions(&mut self.sessions)?;
1425 let state = Self::build_state(&self.plan, &self.session_config)?;
1426 self.sessions = state.sessions;
1427 self.nodes = state.nodes;
1428 self.node_lookup = state.node_lookup;
1429 self.output_log_paths = state.output_log_paths;
1430 self.ready = state.ready;
1431 self.frontier = state.frontier;
1432 self.executed = vec![false; self.nodes.len()];
1433 self.executed_count = 0;
1434 Ok(())
1435 }
1436
1437 pub fn step_causal(&mut self) -> CuResult<Option<DistributedReplayCursor>> {
1439 let Some(next_ready) = self.ready.iter().next().copied() else {
1440 if self.executed_count == self.nodes.len() {
1441 return Ok(None);
1442 }
1443 return Err(CuError::from(
1444 "Distributed replay is deadlocked: no causally ready CopperList remains",
1445 ));
1446 };
1447 self.ready.remove(&next_ready);
1448
1449 let cursor = self.nodes[next_ready.node_index].cursor.clone();
1450 let session_index = self.nodes[next_ready.node_index].session_index;
1451 self.sessions[session_index].goto_cl(cursor.cl_id)?;
1452 self.executed[next_ready.node_index] = true;
1453 self.executed_count += 1;
1454 self.frontier[session_index] = Some(cursor.clone());
1455
1456 let outgoing = self.nodes[next_ready.node_index].outgoing.clone();
1457 for dependent in outgoing {
1458 let node = &mut self.nodes[dependent];
1459 node.remaining_dependencies = node.remaining_dependencies.saturating_sub(1);
1460 if node.remaining_dependencies == 0 {
1461 self.ready.insert(self.ready_key(dependent));
1462 }
1463 }
1464
1465 Ok(Some(cursor))
1466 }
1467
1468 pub fn run_all(&mut self) -> CuResult<()> {
1470 while self.step_causal()?.is_some() {}
1471 Ok(())
1472 }
1473
1474 pub fn goto(&mut self, instance_id: u32, subsystem_id: &str, cl_id: u64) -> CuResult<()> {
1476 let target = self
1477 .node_lookup
1478 .get(&(instance_id, subsystem_id.to_string(), cl_id))
1479 .copied()
1480 .ok_or_else(|| {
1481 CuError::from(format!(
1482 "Distributed replay target instance {} subsystem '{}' CopperList {} does not exist",
1483 instance_id, subsystem_id, cl_id
1484 ))
1485 })?;
1486 self.reset()?;
1487 while !self.executed[target] {
1488 let Some(_) = self.step_causal()? else {
1489 return Err(CuError::from(format!(
1490 "Distributed replay exhausted before reaching instance {} subsystem '{}' CopperList {}",
1491 instance_id, subsystem_id, cl_id
1492 )));
1493 };
1494 }
1495 Ok(())
1496 }
1497
1498 pub fn current_frontier(&self) -> Vec<DistributedReplayCursor> {
1500 self.frontier
1501 .iter()
1502 .filter_map(|cursor| cursor.clone())
1503 .collect()
1504 }
1505
1506 pub fn output_log_path(&self, instance_id: u32, subsystem_id: &str) -> Option<&Path> {
1507 self.output_log_paths
1508 .get(&(instance_id, subsystem_id.to_string()))
1509 .map(PathBuf::as_path)
1510 }
1511
1512 #[inline]
1513 pub fn total_nodes(&self) -> usize {
1514 self.nodes.len()
1515 }
1516
1517 #[inline]
1518 pub fn executed_nodes(&self) -> usize {
1519 self.executed_count
1520 }
1521}
1522
1523fn join_log_paths(logs: &[DistributedReplayLog]) -> String {
1524 logs.iter()
1525 .map(|log| log.base_path.display().to_string())
1526 .collect::<Vec<_>>()
1527 .join(", ")
1528}
1529
1530fn collect_candidate_base_paths(path: &Path, out: &mut BTreeSet<PathBuf>) -> CuResult<()> {
1531 if path.is_dir() {
1532 let mut entries = fs::read_dir(path)
1533 .map_err(|err| {
1534 CuError::new_with_cause(
1535 &format!(
1536 "Failed to read directory '{}' during distributed replay discovery",
1537 path.display()
1538 ),
1539 err,
1540 )
1541 })?
1542 .collect::<Result<Vec<_>, _>>()
1543 .map_err(|err| {
1544 CuError::new_with_cause(
1545 &format!(
1546 "Failed to enumerate directory '{}' during distributed replay discovery",
1547 path.display()
1548 ),
1549 err,
1550 )
1551 })?;
1552 entries.sort_by_key(|entry| entry.path());
1553 for entry in entries {
1554 collect_candidate_base_paths(&entry.path(), out)?;
1555 }
1556 return Ok(());
1557 }
1558
1559 if path
1560 .extension()
1561 .and_then(|ext| ext.to_str())
1562 .is_some_and(|ext| ext == "copper")
1563 {
1564 out.insert(normalize_candidate_log_base(path));
1565 }
1566
1567 Ok(())
1568}
1569
1570fn normalize_candidate_log_base(path: &Path) -> PathBuf {
1571 let Some(extension) = path.extension().and_then(|ext| ext.to_str()) else {
1572 return path.to_path_buf();
1573 };
1574 let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
1575 return path.to_path_buf();
1576 };
1577 let Some((base_stem, slab_suffix)) = stem.rsplit_once('_') else {
1578 return path.to_path_buf();
1579 };
1580
1581 if slab_suffix.is_empty() || !slab_suffix.chars().all(|c| c.is_ascii_digit()) {
1582 return path.to_path_buf();
1583 }
1584
1585 let mut normalized = path.to_path_buf();
1586 normalized.set_file_name(format!("{base_stem}.{extension}"));
1587 if slab_zero_path(&normalized).is_some_and(|slab_zero| slab_zero.exists()) {
1588 normalized
1589 } else {
1590 path.to_path_buf()
1591 }
1592}
1593
1594fn slab_zero_path(base_path: &Path) -> Option<PathBuf> {
1595 let extension = base_path.extension()?.to_str()?;
1596 let stem = base_path.file_stem()?.to_str()?;
1597 let mut slab_zero = base_path.to_path_buf();
1598 slab_zero.set_file_name(format!("{stem}_0.{extension}"));
1599 Some(slab_zero)
1600}
1601
1602fn read_next_entry<T: bincode::Decode<()>>(src: &mut impl Read) -> CuResult<Option<T>> {
1603 match decode_from_std_read::<T, _, _>(src, standard()) {
1604 Ok(entry) => Ok(Some(entry)),
1605 Err(DecodeError::UnexpectedEnd { .. }) => Ok(None),
1606 Err(DecodeError::Io { inner, .. }) if inner.kind() == std::io::ErrorKind::UnexpectedEof => {
1607 Ok(None)
1608 }
1609 Err(err) => Err(CuError::new_with_cause(
1610 "Failed to decode bincode entry during distributed replay discovery",
1611 err,
1612 )),
1613 }
1614}
1615
1616#[cfg(test)]
1617mod tests {
1618 use super::*;
1619 use crate::app::{
1620 CuDistributedReplayApplication, CuRecordedReplayApplication, CuSimApplication,
1621 CuSubsystemMetadata,
1622 };
1623 use crate::config::CuConfig;
1624 use crate::copperlist::CopperList;
1625 use crate::curuntime::KeyFrame;
1626 use crate::simulation::SimOverride;
1627 use bincode::{Decode, Encode};
1628 use cu29_clock::CuTime;
1629 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks, WriteStream};
1630 use cu29_unifiedlog::memmap::MmapSectionStorage;
1631 use cu29_unifiedlog::stream_write;
1632 use serde::Serialize;
1633 use std::sync::{Arc, Mutex};
1634 use tempfile::TempDir;
1635
1636 fn write_runtime_lifecycle_log(
1637 base_path: &Path,
1638 stack: RuntimeLifecycleStackInfo,
1639 mission: Option<&str>,
1640 ) -> CuResult<()> {
1641 if let Some(parent) = base_path.parent() {
1642 fs::create_dir_all(parent).map_err(|err| {
1643 CuError::new_with_cause(
1644 &format!("Failed to create test log directory '{}'", parent.display()),
1645 err,
1646 )
1647 })?;
1648 }
1649
1650 let UnifiedLogger::Write(writer) = UnifiedLoggerBuilder::new()
1651 .write(true)
1652 .create(true)
1653 .preallocated_size(256 * 1024)
1654 .file_base_name(base_path)
1655 .build()
1656 .map_err(|err| {
1657 CuError::new_with_cause(
1658 &format!("Failed to create test log '{}'", base_path.display()),
1659 err,
1660 )
1661 })?
1662 else {
1663 return Err(CuError::from("Expected writable unified logger in test"));
1664 };
1665
1666 let logger = Arc::new(Mutex::new(writer));
1667 let mut stream = stream_write::<RuntimeLifecycleRecord, MmapSectionStorage>(
1668 logger.clone(),
1669 UnifiedLogType::RuntimeLifecycle,
1670 4096,
1671 )?;
1672 stream.log(&RuntimeLifecycleRecord {
1673 timestamp: CuTime::default(),
1674 event: RuntimeLifecycleEvent::Instantiated {
1675 config_source: RuntimeLifecycleConfigSource::ExternalFile,
1676 effective_config_ron: "(runtime: ())".to_string(),
1677 stack,
1678 },
1679 })?;
1680 if let Some(mission) = mission {
1681 stream.log(&RuntimeLifecycleRecord {
1682 timestamp: CuTime::from_nanos(1),
1683 event: RuntimeLifecycleEvent::MissionStarted {
1684 mission: mission.to_string(),
1685 },
1686 })?;
1687 }
1688 drop(stream);
1689 drop(logger);
1690 Ok(())
1691 }
1692
1693 fn test_stack(
1694 subsystem_id: &str,
1695 subsystem_code: u16,
1696 instance_id: u32,
1697 ) -> RuntimeLifecycleStackInfo {
1698 RuntimeLifecycleStackInfo {
1699 app_name: "demo".to_string(),
1700 app_version: "0.1.0".to_string(),
1701 git_commit: Some("abc123".to_string()),
1702 git_dirty: Some(false),
1703 subsystem_id: Some(subsystem_id.to_string()),
1704 subsystem_code,
1705 instance_id,
1706 }
1707 }
1708
1709 fn write_multi_config_fixture(temp_dir: &TempDir, subsystem_ids: &[&str]) -> CuResult<PathBuf> {
1710 for subsystem_id in subsystem_ids {
1711 let subsystem_config = temp_dir.path().join(format!("{subsystem_id}_config.ron"));
1712 fs::write(&subsystem_config, "(tasks: [], cnx: [])").map_err(|err| {
1713 CuError::new_with_cause(
1714 &format!(
1715 "Failed to write subsystem config '{}'",
1716 subsystem_config.display()
1717 ),
1718 err,
1719 )
1720 })?;
1721 }
1722
1723 let subsystem_entries = subsystem_ids
1724 .iter()
1725 .map(|subsystem_id| {
1726 format!(
1727 r#"(
1728 id: "{subsystem_id}",
1729 config: "{subsystem_id}_config.ron",
1730 )"#
1731 )
1732 })
1733 .collect::<Vec<_>>()
1734 .join(",\n");
1735
1736 let multi_config = format!(
1737 "(\n subsystems: [\n{entries}\n ],\n interconnects: [],\n)\n",
1738 entries = subsystem_entries
1739 );
1740 let multi_config_path = temp_dir.path().join("multi_copper.ron");
1741 fs::write(&multi_config_path, multi_config).map_err(|err| {
1742 CuError::new_with_cause(
1743 &format!(
1744 "Failed to write multi-Copper config '{}'",
1745 multi_config_path.display()
1746 ),
1747 err,
1748 )
1749 })?;
1750 Ok(multi_config_path)
1751 }
1752
1753 #[derive(Debug, Default, Encode, Decode, Serialize)]
1754 struct DummyRecordedDataSet;
1755
1756 impl ErasedCuStampedDataSet for DummyRecordedDataSet {
1757 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1758 Vec::new()
1759 }
1760 }
1761
1762 impl MatchingTasks for DummyRecordedDataSet {
1763 fn get_all_task_ids() -> &'static [&'static str] {
1764 &[]
1765 }
1766 }
1767
1768 macro_rules! impl_registered_test_app {
1769 ($name:ident, $subsystem_id:expr, $subsystem_code:expr) => {
1770 struct $name;
1771
1772 impl CuSubsystemMetadata for $name {
1773 fn subsystem() -> Subsystem {
1774 Subsystem::new(Some($subsystem_id), $subsystem_code)
1775 }
1776 }
1777
1778 impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
1779 CuSimApplication<S, L> for $name
1780 {
1781 type Step<'z> = ();
1782
1783 fn get_original_config() -> String {
1784 "(tasks: [], cnx: [])".to_string()
1785 }
1786
1787 fn start_all_tasks(
1788 &mut self,
1789 _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1790 ) -> CuResult<()> {
1791 Ok(())
1792 }
1793
1794 fn run_one_iteration(
1795 &mut self,
1796 _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1797 ) -> CuResult<()> {
1798 Ok(())
1799 }
1800
1801 fn run(
1802 &mut self,
1803 _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1804 ) -> CuResult<()> {
1805 Ok(())
1806 }
1807
1808 fn stop_all_tasks(
1809 &mut self,
1810 _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1811 ) -> CuResult<()> {
1812 Ok(())
1813 }
1814
1815 fn restore_keyframe(&mut self, _freezer: &KeyFrame) -> CuResult<()> {
1816 Ok(())
1817 }
1818 }
1819
1820 impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
1821 CuRecordedReplayApplication<S, L> for $name
1822 {
1823 type RecordedDataSet = DummyRecordedDataSet;
1824
1825 fn replay_recorded_copperlist(
1826 &mut self,
1827 _clock_mock: &RobotClockMock,
1828 _copperlist: &CopperList<Self::RecordedDataSet>,
1829 _keyframe: Option<&KeyFrame>,
1830 ) -> CuResult<()> {
1831 Ok(())
1832 }
1833 }
1834
1835 impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
1836 CuDistributedReplayApplication<S, L> for $name
1837 {
1838 fn build_distributed_replay(
1839 _clock: RobotClock,
1840 _unified_logger: Arc<Mutex<L>>,
1841 _instance_id: u32,
1842 _config_override: Option<CuConfig>,
1843 ) -> CuResult<Self> {
1844 Ok(Self)
1845 }
1846 }
1847 };
1848 }
1849
1850 impl_registered_test_app!(PingRegisteredApp, "ping", 0);
1851 impl_registered_test_app!(PongRegisteredApp, "pong", 1);
1852 impl_registered_test_app!(PingWrongCodeApp, "ping", 99);
1853
1854 struct FakeReplaySession;
1855
1856 impl DistributedReplaySession for FakeReplaySession {
1857 fn goto_cl(&mut self, _cl_id: u64) -> CuResult<()> {
1858 Ok(())
1859 }
1860
1861 fn shutdown(&mut self) -> CuResult<()> {
1862 Ok(())
1863 }
1864 }
1865
1866 fn fake_registration(
1867 subsystem_id: &'static str,
1868 subsystem_code: u16,
1869 session_factory: DistributedReplaySessionFactory,
1870 ) -> DistributedReplayAppRegistration {
1871 DistributedReplayAppRegistration {
1872 subsystem: Subsystem::new(Some(subsystem_id), subsystem_code),
1873 app_type_name: "fake",
1874 session_factory,
1875 }
1876 }
1877
1878 fn fake_assignment(
1879 instance_id: u32,
1880 subsystem_id: &'static str,
1881 subsystem_code: u16,
1882 session_factory: DistributedReplaySessionFactory,
1883 ) -> DistributedReplayAssignment {
1884 DistributedReplayAssignment {
1885 instance_id,
1886 subsystem_id: subsystem_id.to_string(),
1887 log: DistributedReplayLog {
1888 base_path: PathBuf::from(format!("{subsystem_id}_{instance_id}.copper")),
1889 stack: test_stack(subsystem_id, subsystem_code, instance_id),
1890 config_source: RuntimeLifecycleConfigSource::ExternalFile,
1891 effective_config_ron: "(tasks: [], cnx: [])".to_string(),
1892 mission: Some("default".to_string()),
1893 },
1894 registration: fake_registration(subsystem_id, subsystem_code, session_factory),
1895 }
1896 }
1897
1898 fn fake_plan(assignments: Vec<DistributedReplayAssignment>) -> DistributedReplayPlan {
1899 let mut registrations: Vec<_> = assignments
1900 .iter()
1901 .map(|assignment| assignment.registration.clone())
1902 .collect();
1903 registrations.sort_by(|left, right| left.subsystem.id().cmp(&right.subsystem.id()));
1904 let mut selected_instances: Vec<_> = assignments
1905 .iter()
1906 .map(|assignment| assignment.instance_id)
1907 .collect::<BTreeSet<_>>()
1908 .into_iter()
1909 .collect();
1910 selected_instances.sort_unstable();
1911 DistributedReplayPlan {
1912 multi_config_path: PathBuf::from("fake_multi.ron"),
1913 multi_config: MultiCopperConfig {
1914 subsystems: Vec::new(),
1915 interconnects: Vec::new(),
1916 instance_overrides_root: None,
1917 },
1918 catalog: DistributedReplayCatalog::default(),
1919 selected_instances,
1920 mission: Some("default".to_string()),
1921 registrations,
1922 assignments,
1923 }
1924 }
1925
1926 fn fake_ping_session(
1927 assignment: &DistributedReplayAssignment,
1928 _session_config: &DistributedReplaySessionConfig,
1929 ) -> CuResult<DistributedReplaySessionBuild> {
1930 Ok(DistributedReplaySessionBuild {
1931 session: Box::new(FakeReplaySession),
1932 nodes: vec![
1933 DistributedReplayNodeDescriptor {
1934 cursor: DistributedReplayCursor::new(
1935 assignment.instance_id,
1936 assignment.subsystem_id.clone(),
1937 assignment.log.subsystem_code(),
1938 0,
1939 ),
1940 origin_key: DistributedReplayOriginKey {
1941 instance_id: assignment.instance_id,
1942 subsystem_code: assignment.log.subsystem_code(),
1943 cl_id: 0,
1944 },
1945 incoming_origins: BTreeSet::new(),
1946 },
1947 DistributedReplayNodeDescriptor {
1948 cursor: DistributedReplayCursor::new(
1949 assignment.instance_id,
1950 assignment.subsystem_id.clone(),
1951 assignment.log.subsystem_code(),
1952 1,
1953 ),
1954 origin_key: DistributedReplayOriginKey {
1955 instance_id: assignment.instance_id,
1956 subsystem_code: assignment.log.subsystem_code(),
1957 cl_id: 1,
1958 },
1959 incoming_origins: BTreeSet::new(),
1960 },
1961 ],
1962 output_log_path: None,
1963 })
1964 }
1965
1966 fn fake_pong_session(
1967 assignment: &DistributedReplayAssignment,
1968 _session_config: &DistributedReplaySessionConfig,
1969 ) -> CuResult<DistributedReplaySessionBuild> {
1970 Ok(DistributedReplaySessionBuild {
1971 session: Box::new(FakeReplaySession),
1972 nodes: vec![
1973 DistributedReplayNodeDescriptor {
1974 cursor: DistributedReplayCursor::new(
1975 assignment.instance_id,
1976 assignment.subsystem_id.clone(),
1977 assignment.log.subsystem_code(),
1978 0,
1979 ),
1980 origin_key: DistributedReplayOriginKey {
1981 instance_id: assignment.instance_id,
1982 subsystem_code: assignment.log.subsystem_code(),
1983 cl_id: 0,
1984 },
1985 incoming_origins: BTreeSet::from([DistributedReplayOriginKey {
1986 instance_id: assignment.instance_id,
1987 subsystem_code: 0,
1988 cl_id: 0,
1989 }]),
1990 },
1991 DistributedReplayNodeDescriptor {
1992 cursor: DistributedReplayCursor::new(
1993 assignment.instance_id,
1994 assignment.subsystem_id.clone(),
1995 assignment.log.subsystem_code(),
1996 1,
1997 ),
1998 origin_key: DistributedReplayOriginKey {
1999 instance_id: assignment.instance_id,
2000 subsystem_code: assignment.log.subsystem_code(),
2001 cl_id: 1,
2002 },
2003 incoming_origins: BTreeSet::from([DistributedReplayOriginKey {
2004 instance_id: assignment.instance_id,
2005 subsystem_code: 0,
2006 cl_id: 1,
2007 }]),
2008 },
2009 ],
2010 output_log_path: None,
2011 })
2012 }
2013
2014 fn fake_bad_pong_session(
2015 assignment: &DistributedReplayAssignment,
2016 _session_config: &DistributedReplaySessionConfig,
2017 ) -> CuResult<DistributedReplaySessionBuild> {
2018 Ok(DistributedReplaySessionBuild {
2019 session: Box::new(FakeReplaySession),
2020 nodes: vec![DistributedReplayNodeDescriptor {
2021 cursor: DistributedReplayCursor::new(
2022 assignment.instance_id,
2023 assignment.subsystem_id.clone(),
2024 assignment.log.subsystem_code(),
2025 0,
2026 ),
2027 origin_key: DistributedReplayOriginKey {
2028 instance_id: assignment.instance_id,
2029 subsystem_code: assignment.log.subsystem_code(),
2030 cl_id: 0,
2031 },
2032 incoming_origins: BTreeSet::from([DistributedReplayOriginKey {
2033 instance_id: assignment.instance_id,
2034 subsystem_code: 0,
2035 cl_id: 99,
2036 }]),
2037 }],
2038 output_log_path: None,
2039 })
2040 }
2041
2042 const STRESS_SUBSYSTEMS: [(&str, u16); 4] =
2043 [("sense", 0), ("plan", 1), ("control", 2), ("telemetry", 3)];
2044
2045 fn stress_origins_for(
2046 subsystem_id: &str,
2047 instance_id: u32,
2048 cl_id: u64,
2049 ) -> BTreeSet<DistributedReplayOriginKey> {
2050 match subsystem_id {
2051 "sense" => BTreeSet::new(),
2052 "plan" => BTreeSet::from([DistributedReplayOriginKey {
2053 instance_id,
2054 subsystem_code: 0,
2055 cl_id,
2056 }]),
2057 "control" => BTreeSet::from([DistributedReplayOriginKey {
2058 instance_id,
2059 subsystem_code: 1,
2060 cl_id,
2061 }]),
2062 "telemetry" => BTreeSet::from([
2063 DistributedReplayOriginKey {
2064 instance_id,
2065 subsystem_code: 0,
2066 cl_id,
2067 },
2068 DistributedReplayOriginKey {
2069 instance_id,
2070 subsystem_code: 2,
2071 cl_id,
2072 },
2073 ]),
2074 _ => panic!("unexpected synthetic stress subsystem '{subsystem_id}'"),
2075 }
2076 }
2077
2078 fn build_stress_session(
2079 assignment: &DistributedReplayAssignment,
2080 _session_config: &DistributedReplaySessionConfig,
2081 cl_count: u64,
2082 ) -> CuResult<DistributedReplaySessionBuild> {
2083 let subsystem_code = assignment.log.subsystem_code();
2084 let nodes = (0..cl_count)
2085 .map(|cl_id| DistributedReplayNodeDescriptor {
2086 cursor: DistributedReplayCursor::new(
2087 assignment.instance_id,
2088 assignment.subsystem_id.clone(),
2089 subsystem_code,
2090 cl_id,
2091 ),
2092 origin_key: DistributedReplayOriginKey {
2093 instance_id: assignment.instance_id,
2094 subsystem_code,
2095 cl_id,
2096 },
2097 incoming_origins: stress_origins_for(
2098 &assignment.subsystem_id,
2099 assignment.instance_id,
2100 cl_id,
2101 ),
2102 })
2103 .collect();
2104 Ok(DistributedReplaySessionBuild {
2105 session: Box::new(FakeReplaySession),
2106 nodes,
2107 output_log_path: None,
2108 })
2109 }
2110
2111 fn stress_session_ci(
2112 assignment: &DistributedReplayAssignment,
2113 session_config: &DistributedReplaySessionConfig,
2114 ) -> CuResult<DistributedReplaySessionBuild> {
2115 build_stress_session(assignment, session_config, 24)
2116 }
2117
2118 fn stress_session_goto(
2119 assignment: &DistributedReplayAssignment,
2120 session_config: &DistributedReplaySessionConfig,
2121 ) -> CuResult<DistributedReplaySessionBuild> {
2122 build_stress_session(assignment, session_config, 32)
2123 }
2124
2125 fn stress_session_heavy(
2126 assignment: &DistributedReplayAssignment,
2127 session_config: &DistributedReplaySessionConfig,
2128 ) -> CuResult<DistributedReplaySessionBuild> {
2129 build_stress_session(assignment, session_config, 96)
2130 }
2131
2132 fn stress_plan(
2133 instance_count: u32,
2134 session_factory: DistributedReplaySessionFactory,
2135 ) -> DistributedReplayPlan {
2136 let assignments = (1..=instance_count)
2137 .flat_map(|instance_id| {
2138 STRESS_SUBSYSTEMS
2139 .into_iter()
2140 .map(move |(subsystem_id, subsystem_code)| {
2141 fake_assignment(instance_id, subsystem_id, subsystem_code, session_factory)
2142 })
2143 })
2144 .collect();
2145 fake_plan(assignments)
2146 }
2147
2148 fn collect_engine_order(
2149 engine: &mut DistributedReplayEngine,
2150 ) -> CuResult<Vec<DistributedReplayCursor>> {
2151 let mut order = Vec::new();
2152 while let Some(cursor) = engine.step_causal()? {
2153 order.push(cursor);
2154 }
2155 Ok(order)
2156 }
2157
2158 fn assert_stress_order_is_topological(
2159 order: &[DistributedReplayCursor],
2160 instance_count: u32,
2161 cl_count: u64,
2162 ) {
2163 let expected_len = instance_count as usize * STRESS_SUBSYSTEMS.len() * cl_count as usize;
2164 assert_eq!(order.len(), expected_len);
2165
2166 let positions: BTreeMap<_, _> = order
2167 .iter()
2168 .enumerate()
2169 .map(|(idx, cursor)| {
2170 (
2171 (
2172 cursor.instance_id,
2173 cursor.subsystem_id.clone(),
2174 cursor.cl_id,
2175 ),
2176 idx,
2177 )
2178 })
2179 .collect();
2180 assert_eq!(positions.len(), expected_len);
2181
2182 for instance_id in 1..=instance_count {
2183 for (subsystem_id, _) in STRESS_SUBSYSTEMS {
2184 for cl_id in 1..cl_count {
2185 let previous = positions
2186 .get(&(instance_id, subsystem_id.to_string(), cl_id - 1))
2187 .expect("previous local node missing");
2188 let current = positions
2189 .get(&(instance_id, subsystem_id.to_string(), cl_id))
2190 .expect("current local node missing");
2191 assert!(
2192 previous < current,
2193 "local order violated for instance {instance_id} subsystem '{subsystem_id}' cl {cl_id}"
2194 );
2195 }
2196 }
2197
2198 for cl_id in 0..cl_count {
2199 let sense = positions
2200 .get(&(instance_id, "sense".to_string(), cl_id))
2201 .expect("sense node missing");
2202 let plan = positions
2203 .get(&(instance_id, "plan".to_string(), cl_id))
2204 .expect("plan node missing");
2205 let control = positions
2206 .get(&(instance_id, "control".to_string(), cl_id))
2207 .expect("control node missing");
2208 let telemetry = positions
2209 .get(&(instance_id, "telemetry".to_string(), cl_id))
2210 .expect("telemetry node missing");
2211 assert!(sense < plan);
2212 assert!(plan < control);
2213 assert!(sense < telemetry);
2214 assert!(control < telemetry);
2215 }
2216 }
2217 }
2218
2219 #[test]
2220 fn discovers_single_log_identity_from_runtime_lifecycle() -> CuResult<()> {
2221 let temp_dir = TempDir::new()
2222 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2223 let base_path = temp_dir.path().join("logs/ping.copper");
2224 write_runtime_lifecycle_log(&base_path, test_stack("ping", 7, 42), Some("default"))?;
2225
2226 let discovered = DistributedReplayLog::discover(&base_path)?;
2227 assert_eq!(discovered.base_path, base_path);
2228 assert_eq!(discovered.subsystem_id(), Some("ping"));
2229 assert_eq!(discovered.subsystem_code(), 7);
2230 assert_eq!(discovered.instance_id(), 42);
2231 assert_eq!(discovered.mission.as_deref(), Some("default"));
2232 Ok(())
2233 }
2234
2235 #[test]
2236 fn catalog_discovery_normalizes_slab_paths_and_deduplicates_candidates() -> CuResult<()> {
2237 let temp_dir = TempDir::new()
2238 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2239 let base_path = temp_dir.path().join("logs/pong.copper");
2240 let slab_zero_path = temp_dir.path().join("logs/pong_0.copper");
2241 write_runtime_lifecycle_log(&base_path, test_stack("pong", 3, 9), Some("default"))?;
2242
2243 let catalog = DistributedReplayCatalog::discover([base_path.clone(), slab_zero_path])?;
2244 assert!(
2245 catalog.failures.is_empty(),
2246 "unexpected failures: {:?}",
2247 catalog.failures
2248 );
2249 assert_eq!(catalog.logs.len(), 1);
2250 assert_eq!(catalog.logs[0].base_path, base_path);
2251 assert_eq!(catalog.logs[0].subsystem_id(), Some("pong"));
2252 Ok(())
2253 }
2254
2255 #[test]
2256 fn catalog_discovery_walks_directories_using_physical_slab_files() -> CuResult<()> {
2257 let temp_dir = TempDir::new()
2258 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2259 let ping_base = temp_dir.path().join("logs/ping.copper");
2260 let pong_base = temp_dir.path().join("logs/pong.copper");
2261 write_runtime_lifecycle_log(&ping_base, test_stack("ping", 0, 1), Some("alpha"))?;
2262 write_runtime_lifecycle_log(&pong_base, test_stack("pong", 1, 1), Some("alpha"))?;
2263
2264 let catalog = DistributedReplayCatalog::discover_under(temp_dir.path())?;
2265 assert!(
2266 catalog.failures.is_empty(),
2267 "unexpected failures: {:?}",
2268 catalog.failures
2269 );
2270 assert_eq!(catalog.logs.len(), 2);
2271 assert_eq!(catalog.logs[0].subsystem_id(), Some("ping"));
2272 assert_eq!(catalog.logs[1].subsystem_id(), Some("pong"));
2273 assert_eq!(catalog.logs[0].base_path, ping_base);
2274 assert_eq!(catalog.logs[1].base_path, pong_base);
2275 Ok(())
2276 }
2277
2278 #[test]
2279 fn catalog_reports_invalid_logs_without_aborting_scan() -> CuResult<()> {
2280 let temp_dir = TempDir::new()
2281 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2282 let good_base = temp_dir.path().join("logs/good.copper");
2283 write_runtime_lifecycle_log(&good_base, test_stack("good", 2, 5), Some("beta"))?;
2284
2285 let bad_slab = temp_dir.path().join("logs/bad_0.copper");
2286 if let Some(parent) = bad_slab.parent() {
2287 fs::create_dir_all(parent).map_err(|err| {
2288 CuError::new_with_cause(
2289 &format!("Failed to create bad log dir '{}'", parent.display()),
2290 err,
2291 )
2292 })?;
2293 }
2294 fs::write(&bad_slab, b"not a copper log").map_err(|err| {
2295 CuError::new_with_cause(
2296 &format!("Failed to create bad log '{}'", bad_slab.display()),
2297 err,
2298 )
2299 })?;
2300
2301 let catalog = DistributedReplayCatalog::discover_under(temp_dir.path())?;
2302 assert_eq!(catalog.logs.len(), 1);
2303 assert_eq!(catalog.failures.len(), 1);
2304 assert_eq!(catalog.logs[0].subsystem_id(), Some("good"));
2305 assert_eq!(
2306 catalog.failures[0].candidate_path,
2307 temp_dir.path().join("logs/bad.copper")
2308 );
2309 Ok(())
2310 }
2311
2312 #[test]
2313 fn builder_builds_validated_plan_for_selected_instances() -> CuResult<()> {
2314 let temp_dir = TempDir::new()
2315 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2316 let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2317 let logs_root = temp_dir.path().join("logs");
2318
2319 write_runtime_lifecycle_log(
2320 &logs_root.join("instance1_ping.copper"),
2321 test_stack("ping", 0, 1),
2322 Some("default"),
2323 )?;
2324 write_runtime_lifecycle_log(
2325 &logs_root.join("instance1_pong.copper"),
2326 test_stack("pong", 1, 1),
2327 Some("default"),
2328 )?;
2329 write_runtime_lifecycle_log(
2330 &logs_root.join("instance2_ping.copper"),
2331 test_stack("ping", 0, 2),
2332 Some("default"),
2333 )?;
2334 write_runtime_lifecycle_log(
2335 &logs_root.join("instance2_pong.copper"),
2336 test_stack("pong", 1, 2),
2337 Some("default"),
2338 )?;
2339
2340 let plan = DistributedReplayPlan::builder(&multi_config_path)?
2341 .discover_logs_under(&logs_root)?
2342 .register::<PingRegisteredApp>("ping")?
2343 .register::<PongRegisteredApp>("pong")?
2344 .instances([2])
2345 .build()?;
2346
2347 assert_eq!(plan.selected_instances, vec![2]);
2348 assert_eq!(plan.mission.as_deref(), Some("default"));
2349 assert_eq!(plan.assignments.len(), 2);
2350 assert_eq!(
2351 plan.assignment(2, "ping").unwrap().log.base_path,
2352 logs_root.join("instance2_ping.copper")
2353 );
2354 assert_eq!(
2355 plan.assignment(2, "pong").unwrap().log.base_path,
2356 logs_root.join("instance2_pong.copper")
2357 );
2358 Ok(())
2359 }
2360
2361 #[test]
2362 fn register_rejects_subsystem_code_mismatch() -> CuResult<()> {
2363 let temp_dir = TempDir::new()
2364 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2365 let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2366
2367 let err = DistributedReplayPlan::builder(&multi_config_path)?
2368 .register::<PingWrongCodeApp>("ping")
2369 .unwrap_err();
2370 assert!(err.to_string().contains("declares subsystem code 99"));
2371 Ok(())
2372 }
2373
2374 #[test]
2375 fn build_reports_missing_logs_and_missing_registrations() -> CuResult<()> {
2376 let temp_dir = TempDir::new()
2377 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2378 let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2379 let logs_root = temp_dir.path().join("logs");
2380
2381 write_runtime_lifecycle_log(
2382 &logs_root.join("instance1_ping.copper"),
2383 test_stack("ping", 0, 1),
2384 Some("default"),
2385 )?;
2386
2387 let err = DistributedReplayPlan::builder(&multi_config_path)?
2388 .discover_logs_under(&logs_root)?
2389 .register::<PingRegisteredApp>("ping")?
2390 .build()
2391 .unwrap_err();
2392 let err_text = err.to_string();
2393 assert!(err_text.contains("missing app registration for subsystem 'pong'"));
2394 assert!(err_text.contains("missing log for instance 1 subsystem 'pong'"));
2395 Ok(())
2396 }
2397
2398 #[test]
2399 fn build_reports_duplicate_logs_for_one_target() -> CuResult<()> {
2400 let temp_dir = TempDir::new()
2401 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2402 let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2403 let logs_root = temp_dir.path().join("logs");
2404
2405 write_runtime_lifecycle_log(
2406 &logs_root.join("instance1_ping_a.copper"),
2407 test_stack("ping", 0, 1),
2408 Some("default"),
2409 )?;
2410 write_runtime_lifecycle_log(
2411 &logs_root.join("instance1_ping_b.copper"),
2412 test_stack("ping", 0, 1),
2413 Some("default"),
2414 )?;
2415 write_runtime_lifecycle_log(
2416 &logs_root.join("instance1_pong.copper"),
2417 test_stack("pong", 1, 1),
2418 Some("default"),
2419 )?;
2420
2421 let err = DistributedReplayPlan::builder(&multi_config_path)?
2422 .discover_logs_under(&logs_root)?
2423 .register::<PingRegisteredApp>("ping")?
2424 .register::<PongRegisteredApp>("pong")?
2425 .build()
2426 .unwrap_err();
2427 assert!(
2428 err.to_string()
2429 .contains("found 2 logs for instance 1 subsystem 'ping'")
2430 );
2431 Ok(())
2432 }
2433
2434 #[test]
2435 fn build_reports_mission_mismatch_across_selected_logs() -> CuResult<()> {
2436 let temp_dir = TempDir::new()
2437 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2438 let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2439 let logs_root = temp_dir.path().join("logs");
2440
2441 write_runtime_lifecycle_log(
2442 &logs_root.join("instance1_ping.copper"),
2443 test_stack("ping", 0, 1),
2444 Some("default"),
2445 )?;
2446 write_runtime_lifecycle_log(
2447 &logs_root.join("instance1_pong.copper"),
2448 test_stack("pong", 1, 1),
2449 Some("recovery"),
2450 )?;
2451
2452 let err = DistributedReplayPlan::builder(&multi_config_path)?
2453 .discover_logs_under(&logs_root)?
2454 .register::<PingRegisteredApp>("ping")?
2455 .register::<PongRegisteredApp>("pong")?
2456 .build()
2457 .unwrap_err();
2458 assert!(
2459 err.to_string()
2460 .contains("selected logs disagree on mission: default, recovery")
2461 );
2462 Ok(())
2463 }
2464
2465 #[test]
2466 fn engine_steps_in_stable_causal_order() -> CuResult<()> {
2467 let plan = fake_plan(vec![
2468 fake_assignment(1, "ping", 0, fake_ping_session),
2469 fake_assignment(1, "pong", 1, fake_pong_session),
2470 ]);
2471
2472 let mut engine = plan.start()?;
2473 let mut order = Vec::new();
2474 while let Some(cursor) = engine.step_causal()? {
2475 order.push((cursor.subsystem_id, cursor.cl_id));
2476 }
2477
2478 assert_eq!(
2479 order,
2480 vec![
2481 ("ping".to_string(), 0),
2482 ("ping".to_string(), 1),
2483 ("pong".to_string(), 0),
2484 ("pong".to_string(), 1),
2485 ]
2486 );
2487 assert_eq!(engine.executed_nodes(), 4);
2488 Ok(())
2489 }
2490
2491 #[test]
2492 fn engine_goto_rebuilds_and_replays_to_target() -> CuResult<()> {
2493 let plan = fake_plan(vec![
2494 fake_assignment(1, "ping", 0, fake_ping_session),
2495 fake_assignment(1, "pong", 1, fake_pong_session),
2496 ]);
2497
2498 let mut engine = plan.start()?;
2499 engine.run_all()?;
2500 engine.goto(1, "pong", 0)?;
2501
2502 assert_eq!(engine.executed_nodes(), 3);
2503 let frontier = engine.current_frontier();
2504 assert_eq!(frontier.len(), 2);
2505 assert!(frontier.iter().any(|cursor| {
2506 cursor.instance_id == 1 && cursor.subsystem_id == "ping" && cursor.cl_id == 1
2507 }));
2508 assert!(frontier.iter().any(|cursor| {
2509 cursor.instance_id == 1 && cursor.subsystem_id == "pong" && cursor.cl_id == 0
2510 }));
2511 Ok(())
2512 }
2513
2514 #[test]
2515 fn engine_reports_unresolved_recorded_provenance() -> CuResult<()> {
2516 let plan = fake_plan(vec![
2517 fake_assignment(1, "ping", 0, fake_ping_session),
2518 fake_assignment(1, "pong", 1, fake_bad_pong_session),
2519 ]);
2520
2521 let err = match plan.start() {
2522 Ok(_) => return Err(CuError::from("expected distributed replay startup failure")),
2523 Err(err) => err,
2524 };
2525 assert!(
2526 err.to_string()
2527 .contains("Unresolved recorded provenance edge")
2528 );
2529 Ok(())
2530 }
2531
2532 #[test]
2533 fn engine_run_all_scales_across_many_identical_instances() -> CuResult<()> {
2534 let mut engine = stress_plan(6, stress_session_ci).start()?;
2535 let order = collect_engine_order(&mut engine)?;
2536
2537 assert_stress_order_is_topological(&order, 6, 24);
2538 assert_eq!(engine.executed_nodes(), 6 * STRESS_SUBSYSTEMS.len() * 24);
2539
2540 let frontier = engine.current_frontier();
2541 assert_eq!(frontier.len(), 6 * STRESS_SUBSYSTEMS.len());
2542 for instance_id in 1..=6 {
2543 for (subsystem_id, _) in STRESS_SUBSYSTEMS {
2544 assert!(frontier.iter().any(|cursor| {
2545 cursor.instance_id == instance_id
2546 && cursor.subsystem_id == subsystem_id
2547 && cursor.cl_id == 23
2548 }));
2549 }
2550 }
2551 Ok(())
2552 }
2553
2554 #[test]
2555 fn engine_goto_matches_manual_replay_on_large_graph() -> CuResult<()> {
2556 let plan = stress_plan(5, stress_session_goto);
2557 let mut manual = plan.clone().start()?;
2558
2559 let (expected_steps, expected_frontier) = {
2560 let mut expected_steps = 0usize;
2561 loop {
2562 let Some(cursor) = manual.step_causal()? else {
2563 return Err(CuError::from(
2564 "manual distributed replay exhausted before reaching stress target",
2565 ));
2566 };
2567 expected_steps += 1;
2568 if cursor.instance_id == 4 && cursor.subsystem_id == "control" && cursor.cl_id == 17
2569 {
2570 break (expected_steps, manual.current_frontier());
2571 }
2572 }
2573 };
2574
2575 let mut via_goto = plan.start()?;
2576 via_goto.goto(4, "control", 17)?;
2577
2578 assert_eq!(via_goto.executed_nodes(), expected_steps);
2579 assert_eq!(via_goto.current_frontier(), expected_frontier);
2580 Ok(())
2581 }
2582
2583 #[test]
2584 #[ignore = "stress"]
2585 fn engine_heavy_stress_run_all_completes() -> CuResult<()> {
2586 let mut engine = stress_plan(12, stress_session_heavy).start()?;
2587 engine.run_all()?;
2588
2589 let expected = 12 * STRESS_SUBSYSTEMS.len() * 96;
2590 assert_eq!(engine.executed_nodes(), expected);
2591 assert_eq!(
2592 engine.current_frontier().len(),
2593 12 * STRESS_SUBSYSTEMS.len()
2594 );
2595 Ok(())
2596 }
2597}