Skip to main content

cu29_runtime/
distributed_replay.rs

1//! Discovery, validation, planning, and causal execution helpers for
2//! distributed deterministic replay.
3//!
4//! The distributed replay flow is:
5//! - discover Copper logs and recover runtime identity from lifecycle metadata
6//! - validate those logs against a strict multi-Copper topology
7//! - register the generated replayable app type for each subsystem
8//! - build one replay session per `(instance_id, subsystem_id)` assignment
9//! - stitch sessions together through recorded message provenance
10//! - replay the fleet in a stable causal order
11
12use crate::app::{
13    CuDistributedReplayApplication, CuRecordedReplayApplication, CuSimApplication, Subsystem,
14};
15use crate::config::{MultiCopperConfig, read_configuration_str, read_multi_configuration};
16use crate::copperlist::CopperList;
17use crate::curuntime::{
18    KeyFrame, RuntimeLifecycleConfigSource, RuntimeLifecycleEvent, RuntimeLifecycleRecord,
19    RuntimeLifecycleStackInfo,
20};
21use crate::debug::{
22    SectionIndexEntry, build_read_logger, decode_copperlists, index_log, read_section_at,
23};
24use crate::simulation::recorded_copperlist_timestamp;
25use bincode::config::standard;
26use bincode::decode_from_std_read;
27use bincode::error::DecodeError;
28use cu29_clock::{RobotClock, RobotClockMock};
29use cu29_traits::{CopperListTuple, CuError, CuResult, ErasedCuStampedDataSet, UnifiedLogType};
30use cu29_unifiedlog::memmap::MmapSectionStorage;
31use cu29_unifiedlog::{
32    NoopLogger, NoopSectionStorage, SectionStorage, UnifiedLogWrite, UnifiedLogger,
33    UnifiedLoggerBuilder, UnifiedLoggerIOReader, UnifiedLoggerRead, UnifiedLoggerWrite,
34};
35use std::any::type_name;
36use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
37use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
38use std::fs;
39use std::io::Read;
40use std::path::{Path, PathBuf};
41use std::sync::{Arc, Mutex};
42
43/// One discovered Copper log that can participate in distributed replay.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct DistributedReplayLog {
46    pub base_path: PathBuf,
47    pub stack: RuntimeLifecycleStackInfo,
48    pub config_source: RuntimeLifecycleConfigSource,
49    pub effective_config_ron: String,
50    pub mission: Option<String>,
51}
52
53impl DistributedReplayLog {
54    /// Discover a single Copper log from either its base path (`foo.copper`) or
55    /// one of its slab paths (`foo_0.copper`, `foo_1.copper`, ...).
56    pub fn discover(path: impl AsRef<Path>) -> CuResult<Self> {
57        let requested_path = path.as_ref();
58        let normalized_path = normalize_candidate_log_base(requested_path);
59        match Self::discover_from_base_path(requested_path) {
60            Ok(log) => Ok(log),
61            Err(_) if normalized_path != requested_path => {
62                Self::discover_from_base_path(&normalized_path)
63            }
64            Err(err) => Err(err),
65        }
66    }
67
68    fn discover_from_base_path(base_path: &Path) -> CuResult<Self> {
69        let UnifiedLogger::Read(read_logger) = UnifiedLoggerBuilder::new()
70            .file_base_name(base_path)
71            .build()
72            .map_err(|err| {
73                CuError::new_with_cause(
74                    &format!(
75                        "Failed to open Copper log '{}' for distributed replay discovery",
76                        base_path.display()
77                    ),
78                    err,
79                )
80            })?
81        else {
82            return Err(CuError::from(
83                "Expected a readable unified logger during distributed replay discovery",
84            ));
85        };
86
87        let mut reader = UnifiedLoggerIOReader::new(read_logger, UnifiedLogType::RuntimeLifecycle);
88        let mut instantiated: Option<(
89            RuntimeLifecycleConfigSource,
90            String,
91            RuntimeLifecycleStackInfo,
92        )> = None;
93        let mut mission = None;
94
95        while let Some(record) =
96            read_next_entry::<RuntimeLifecycleRecord>(&mut reader).map_err(|err| {
97                CuError::from(format!(
98                    "Failed to decode runtime lifecycle for '{}': {err}",
99                    base_path.display()
100                ))
101            })?
102        {
103            match record.event {
104                RuntimeLifecycleEvent::Instantiated {
105                    config_source,
106                    effective_config_ron,
107                    stack,
108                } if instantiated.is_none() => {
109                    instantiated = Some((config_source, effective_config_ron, stack));
110                }
111                RuntimeLifecycleEvent::MissionStarted {
112                    mission: started_mission,
113                } if mission.is_none() => {
114                    mission = Some(started_mission);
115                }
116                _ => {}
117            }
118
119            if instantiated.is_some() && mission.is_some() {
120                break;
121            }
122        }
123
124        let Some((config_source, effective_config_ron, stack)) = instantiated else {
125            return Err(CuError::from(format!(
126                "Copper log '{}' has no RuntimeLifecycle::Instantiated record",
127                base_path.display()
128            )));
129        };
130
131        Ok(Self {
132            base_path: base_path.to_path_buf(),
133            stack,
134            config_source,
135            effective_config_ron,
136            mission,
137        })
138    }
139
140    #[inline]
141    pub fn instance_id(&self) -> u32 {
142        self.stack.instance_id
143    }
144
145    #[inline]
146    pub fn subsystem_code(&self) -> u16 {
147        self.stack.subsystem_code
148    }
149
150    #[inline]
151    pub fn subsystem_id(&self) -> Option<&str> {
152        self.stack.subsystem_id.as_deref()
153    }
154}
155
156/// Discovery error recorded for one log candidate.
157#[derive(Debug, Clone)]
158pub struct DistributedReplayDiscoveryFailure {
159    pub candidate_path: PathBuf,
160    pub error: String,
161}
162
163impl Display for DistributedReplayDiscoveryFailure {
164    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
165        write!(
166            f,
167            "{}: {}",
168            self.candidate_path.display(),
169            self.error.as_str()
170        )
171    }
172}
173
174/// Result of scanning one or more paths for distributed replay logs.
175#[derive(Debug, Clone, Default)]
176pub struct DistributedReplayCatalog {
177    pub logs: Vec<DistributedReplayLog>,
178    pub failures: Vec<DistributedReplayDiscoveryFailure>,
179}
180
181impl DistributedReplayCatalog {
182    /// Discover logs from a list of files and/or directories.
183    ///
184    /// Directories are traversed recursively. Any physical slab file
185    /// (`*_0.copper`, `*_1.copper`, ...) is normalized back to its base log path.
186    pub fn discover<I, P>(inputs: I) -> CuResult<Self>
187    where
188        I: IntoIterator<Item = P>,
189        P: AsRef<Path>,
190    {
191        let mut candidates = BTreeSet::new();
192        for input in inputs {
193            collect_candidate_base_paths(input.as_ref(), &mut candidates)?;
194        }
195
196        let mut logs = Vec::new();
197        let mut failures = Vec::new();
198
199        for candidate in candidates {
200            match DistributedReplayLog::discover(&candidate) {
201                Ok(log) => logs.push(log),
202                Err(err) => failures.push(DistributedReplayDiscoveryFailure {
203                    candidate_path: candidate,
204                    error: err.to_string(),
205                }),
206            }
207        }
208
209        logs.sort_by(|left, right| {
210            (
211                left.instance_id(),
212                left.subsystem_code(),
213                left.subsystem_id(),
214                left.base_path.as_os_str(),
215            )
216                .cmp(&(
217                    right.instance_id(),
218                    right.subsystem_code(),
219                    right.subsystem_id(),
220                    right.base_path.as_os_str(),
221                ))
222        });
223        failures.sort_by(|left, right| left.candidate_path.cmp(&right.candidate_path));
224
225        Ok(Self { logs, failures })
226    }
227
228    /// Convenience wrapper for recursive discovery rooted at one directory.
229    pub fn discover_under(root: impl AsRef<Path>) -> CuResult<Self> {
230        Self::discover([root])
231    }
232}
233
234type DistributedReplaySessionFactory = fn(
235    &DistributedReplayAssignment,
236    &DistributedReplaySessionConfig,
237) -> CuResult<DistributedReplaySessionBuild>;
238
239const DEFAULT_SECTION_CACHE_CAP: usize = 8;
240const DEFAULT_REPLAY_LOG_SIZE_BYTES: usize = 64 * 1024 * 1024;
241
242#[derive(Debug, Clone, Default)]
243struct DistributedReplaySessionConfig {
244    output_root: Option<PathBuf>,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
248struct DistributedReplayOriginKey {
249    instance_id: u32,
250    subsystem_code: u16,
251    cl_id: u64,
252}
253
254#[derive(Debug, Clone, PartialEq, Eq, Hash)]
255pub struct DistributedReplayCursor {
256    pub instance_id: u32,
257    pub subsystem_id: String,
258    pub cl_id: u64,
259    subsystem_code: u16,
260}
261
262impl DistributedReplayCursor {
263    #[inline]
264    fn new(instance_id: u32, subsystem_id: String, subsystem_code: u16, cl_id: u64) -> Self {
265        Self {
266            instance_id,
267            subsystem_id,
268            cl_id,
269            subsystem_code,
270        }
271    }
272
273    #[inline]
274    pub fn subsystem_code(&self) -> u16 {
275        self.subsystem_code
276    }
277}
278
279#[derive(Debug, Clone)]
280struct DistributedReplayNodeDescriptor {
281    cursor: DistributedReplayCursor,
282    origin_key: DistributedReplayOriginKey,
283    incoming_origins: BTreeSet<DistributedReplayOriginKey>,
284}
285
286#[derive(Debug, Clone)]
287struct DistributedReplayGraphNode {
288    cursor: DistributedReplayCursor,
289    session_index: usize,
290    outgoing: Vec<usize>,
291    initial_dependencies: usize,
292    remaining_dependencies: usize,
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
296struct DistributedReplayReadyNode {
297    instance_id: u32,
298    subsystem_code: u16,
299    cl_id: u64,
300    node_index: usize,
301}
302
303struct DistributedReplaySessionBuild {
304    session: Box<dyn DistributedReplaySession>,
305    nodes: Vec<DistributedReplayNodeDescriptor>,
306    output_log_path: Option<PathBuf>,
307}
308
309trait DistributedReplaySession {
310    fn goto_cl(&mut self, cl_id: u64) -> CuResult<()>;
311    fn shutdown(&mut self) -> CuResult<()>;
312}
313
314#[derive(Debug, Clone)]
315struct RecordedReplayCachedSection<P: CopperListTuple> {
316    entries: Vec<Arc<CopperList<P>>>,
317}
318
319struct RecordedReplaySession<App, P, S, L>
320where
321    App: CuDistributedReplayApplication<S, L>,
322    P: CopperListTuple,
323    S: SectionStorage,
324    L: UnifiedLogWrite<S> + 'static,
325{
326    assignment: DistributedReplayAssignment,
327    app: App,
328    clock_mock: RobotClockMock,
329    log_reader: UnifiedLoggerRead,
330    sections: Vec<SectionIndexEntry>,
331    total_entries: usize,
332    keyframes: Vec<KeyFrame>,
333    started: bool,
334    current_idx: Option<usize>,
335    last_keyframe: Option<u64>,
336    cache: HashMap<usize, RecordedReplayCachedSection<P>>,
337    cache_order: VecDeque<usize>,
338    cache_cap: usize,
339    phantom: std::marker::PhantomData<(S, L)>,
340}
341
342impl<App, P, S, L> RecordedReplaySession<App, P, S, L>
343where
344    App: CuDistributedReplayApplication<S, L>
345        + CuRecordedReplayApplication<S, L, RecordedDataSet = P>,
346    P: CopperListTuple + 'static,
347    S: SectionStorage,
348    L: UnifiedLogWrite<S> + 'static,
349{
350    fn from_log(
351        assignment: DistributedReplayAssignment,
352        app: App,
353        clock_mock: RobotClockMock,
354        log_base: &Path,
355    ) -> CuResult<Self> {
356        crate::logcodec::set_effective_config_ron::<P>(&assignment.log.effective_config_ron);
357        let (sections, keyframes, total_entries) =
358            index_log::<P, _>(log_base, &recorded_copperlist_timestamp::<P>)?;
359        let log_reader = build_read_logger(log_base)?;
360        Ok(Self {
361            assignment,
362            app,
363            clock_mock,
364            log_reader,
365            sections,
366            total_entries,
367            keyframes,
368            started: false,
369            current_idx: None,
370            last_keyframe: None,
371            cache: HashMap::new(),
372            cache_order: VecDeque::new(),
373            cache_cap: DEFAULT_SECTION_CACHE_CAP,
374            phantom: std::marker::PhantomData,
375        })
376    }
377
378    fn describe_nodes(&mut self) -> CuResult<Vec<DistributedReplayNodeDescriptor>> {
379        let mut nodes = Vec::with_capacity(self.total_entries);
380        for idx in 0..self.total_entries {
381            let (copperlist, _) = self.copperlist_at(idx)?;
382            let cursor = DistributedReplayCursor::new(
383                self.assignment.instance_id,
384                self.assignment.subsystem_id.clone(),
385                self.assignment.log.subsystem_code(),
386                copperlist.id,
387            );
388            nodes.push(DistributedReplayNodeDescriptor {
389                origin_key: DistributedReplayOriginKey {
390                    instance_id: cursor.instance_id,
391                    subsystem_code: cursor.subsystem_code(),
392                    cl_id: cursor.cl_id,
393                },
394                incoming_origins: copperlist_origins(copperlist.as_ref()),
395                cursor,
396            });
397        }
398        Ok(nodes)
399    }
400
401    fn ensure_started(&mut self) -> CuResult<()> {
402        if self.started {
403            return Ok(());
404        }
405        let mut noop = |_step: App::Step<'_>| crate::simulation::SimOverride::ExecuteByRuntime;
406        <App as CuSimApplication<S, L>>::start_all_tasks(&mut self.app, &mut noop)?;
407        self.started = true;
408        Ok(())
409    }
410
411    fn nearest_keyframe(&self, target_cl_id: u64) -> Option<KeyFrame> {
412        self.keyframes
413            .iter()
414            .filter(|keyframe| keyframe.culistid <= target_cl_id)
415            .max_by_key(|keyframe| keyframe.culistid)
416            .cloned()
417    }
418
419    fn restore_keyframe(&mut self, keyframe: &KeyFrame) -> CuResult<()> {
420        <App as CuSimApplication<S, L>>::restore_keyframe(&mut self.app, keyframe)?;
421        self.clock_mock.set_value(keyframe.timestamp.as_nanos());
422        self.last_keyframe = Some(keyframe.culistid);
423        Ok(())
424    }
425
426    fn find_section_for_index(&self, idx: usize) -> Option<usize> {
427        self.sections
428            .binary_search_by(|section| {
429                if idx < section.start_idx {
430                    std::cmp::Ordering::Greater
431                } else if idx >= section.start_idx + section.len {
432                    std::cmp::Ordering::Less
433                } else {
434                    std::cmp::Ordering::Equal
435                }
436            })
437            .ok()
438    }
439
440    fn find_section_for_cl_id(&self, cl_id: u64) -> Option<usize> {
441        self.sections
442            .binary_search_by(|section| {
443                if cl_id < section.first_id {
444                    std::cmp::Ordering::Greater
445                } else if cl_id > section.last_id {
446                    std::cmp::Ordering::Less
447                } else {
448                    std::cmp::Ordering::Equal
449                }
450            })
451            .ok()
452    }
453
454    fn touch_cache(&mut self, key: usize) {
455        if let Some(position) = self.cache_order.iter().position(|entry| *entry == key) {
456            self.cache_order.remove(position);
457        }
458        self.cache_order.push_back(key);
459        while self.cache_order.len() > self.cache_cap {
460            if let Some(oldest) = self.cache_order.pop_front() {
461                self.cache.remove(&oldest);
462            }
463        }
464    }
465
466    fn load_section(&mut self, section_idx: usize) -> CuResult<&RecordedReplayCachedSection<P>> {
467        if self.cache.contains_key(&section_idx) {
468            self.touch_cache(section_idx);
469            return Ok(self.cache.get(&section_idx).expect("cache entry exists"));
470        }
471
472        let entry = &self.sections[section_idx];
473        let (header, data) = read_section_at(&mut self.log_reader, entry.pos)?;
474        if header.entry_type != UnifiedLogType::CopperList {
475            return Err(CuError::from(
476                "Section type mismatch while loading distributed replay copperlists",
477            ));
478        }
479        let (entries, _) = decode_copperlists::<P, _>(&data, &recorded_copperlist_timestamp::<P>)?;
480        self.cache
481            .insert(section_idx, RecordedReplayCachedSection { entries });
482        self.touch_cache(section_idx);
483        Ok(self.cache.get(&section_idx).expect("cache entry exists"))
484    }
485
486    fn copperlist_at(&mut self, idx: usize) -> CuResult<(Arc<CopperList<P>>, Option<KeyFrame>)> {
487        let section_idx = self
488            .find_section_for_index(idx)
489            .ok_or_else(|| CuError::from("Distributed replay index is outside the log"))?;
490        let start_idx = self.sections[section_idx].start_idx;
491        let section = self.load_section(section_idx)?;
492        let local_idx = idx - start_idx;
493        let copperlist = section
494            .entries
495            .get(local_idx)
496            .ok_or_else(|| CuError::from("Corrupt distributed replay section index"))?
497            .clone();
498        let keyframe = self
499            .keyframes
500            .iter()
501            .find(|keyframe| keyframe.culistid == copperlist.id)
502            .cloned();
503        Ok((copperlist, keyframe))
504    }
505
506    fn index_for_cl_id(&mut self, cl_id: u64) -> CuResult<usize> {
507        let section_idx = self
508            .find_section_for_cl_id(cl_id)
509            .ok_or_else(|| CuError::from("Requested CopperList id is not present in the log"))?;
510        let start_idx = self.sections[section_idx].start_idx;
511        let section = self.load_section(section_idx)?;
512        for (offset, copperlist) in section.entries.iter().enumerate() {
513            if copperlist.id == cl_id {
514                return Ok(start_idx + offset);
515            }
516        }
517        Err(CuError::from(
518            "Requested CopperList id is missing from its indexed log section",
519        ))
520    }
521
522    fn replay_range(
523        &mut self,
524        start_idx: usize,
525        end_idx: usize,
526        replay_keyframe: Option<&KeyFrame>,
527    ) -> CuResult<()> {
528        for idx in start_idx..=end_idx {
529            let (copperlist, keyframe) = self.copperlist_at(idx)?;
530            let keyframe = replay_keyframe
531                .filter(|candidate| candidate.culistid == copperlist.id)
532                .or(keyframe
533                    .as_ref()
534                    .filter(|candidate| candidate.culistid == copperlist.id));
535            <App as CuRecordedReplayApplication<S, L>>::replay_recorded_copperlist(
536                &mut self.app,
537                &self.clock_mock,
538                copperlist.as_ref(),
539                keyframe,
540            )?;
541            self.current_idx = Some(idx);
542        }
543        Ok(())
544    }
545
546    fn goto_index(&mut self, target_idx: usize) -> CuResult<()> {
547        self.ensure_started()?;
548        if target_idx >= self.total_entries {
549            return Err(CuError::from(
550                "Distributed replay target is outside the log",
551            ));
552        }
553
554        let (target_copperlist, _) = self.copperlist_at(target_idx)?;
555        let target_cl_id = target_copperlist.id;
556
557        let replay_start_idx;
558        let replay_keyframe;
559
560        if let Some(current_idx) = self.current_idx {
561            if current_idx == target_idx {
562                return Ok(());
563            }
564
565            if target_idx > current_idx {
566                replay_start_idx = current_idx + 1;
567                replay_keyframe = None;
568            } else {
569                let keyframe = self.nearest_keyframe(target_cl_id).ok_or_else(|| {
570                    CuError::from("No keyframe is available to rewind distributed replay")
571                })?;
572                self.restore_keyframe(&keyframe)?;
573                replay_start_idx = self.index_for_cl_id(keyframe.culistid)?;
574                replay_keyframe = Some(keyframe);
575            }
576        } else {
577            let keyframe = self.nearest_keyframe(target_cl_id).ok_or_else(|| {
578                CuError::from("No keyframe is available to initialize distributed replay")
579            })?;
580            self.restore_keyframe(&keyframe)?;
581            replay_start_idx = self.index_for_cl_id(keyframe.culistid)?;
582            replay_keyframe = Some(keyframe);
583        }
584
585        self.replay_range(replay_start_idx, target_idx, replay_keyframe.as_ref())
586    }
587}
588
589impl<App, P, S, L> DistributedReplaySession for RecordedReplaySession<App, P, S, L>
590where
591    App: CuDistributedReplayApplication<S, L>
592        + CuRecordedReplayApplication<S, L, RecordedDataSet = P>,
593    P: CopperListTuple + 'static,
594    S: SectionStorage,
595    L: UnifiedLogWrite<S> + 'static,
596{
597    fn goto_cl(&mut self, cl_id: u64) -> CuResult<()> {
598        let target_idx = self.index_for_cl_id(cl_id)?;
599        self.goto_index(target_idx)
600    }
601
602    fn shutdown(&mut self) -> CuResult<()> {
603        if !self.started {
604            return Ok(());
605        }
606
607        let mut noop = |_step: App::Step<'_>| crate::simulation::SimOverride::ExecuteByRuntime;
608        <App as CuSimApplication<S, L>>::stop_all_tasks(&mut self.app, &mut noop)?;
609        self.started = false;
610        Ok(())
611    }
612}
613
614/// One typed subsystem registration provided to the distributed replay builder.
615#[derive(Clone)]
616pub struct DistributedReplayAppRegistration {
617    pub subsystem: Subsystem,
618    pub app_type_name: &'static str,
619    session_factory: DistributedReplaySessionFactory,
620}
621
622impl Debug for DistributedReplayAppRegistration {
623    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
624        f.debug_struct("DistributedReplayAppRegistration")
625            .field("subsystem", &self.subsystem)
626            .field("app_type_name", &self.app_type_name)
627            .finish()
628    }
629}
630
631impl PartialEq for DistributedReplayAppRegistration {
632    fn eq(&self, other: &Self) -> bool {
633        self.subsystem == other.subsystem && self.app_type_name == other.app_type_name
634    }
635}
636
637impl Eq for DistributedReplayAppRegistration {}
638
639/// One validated log assignment for a subsystem instance.
640#[derive(Debug, Clone, PartialEq, Eq)]
641pub struct DistributedReplayAssignment {
642    pub instance_id: u32,
643    pub subsystem_id: String,
644    pub log: DistributedReplayLog,
645    pub registration: DistributedReplayAppRegistration,
646}
647
648/// Validated replay plan produced by [`DistributedReplayBuilder`].
649#[derive(Debug, Clone)]
650pub struct DistributedReplayPlan {
651    pub multi_config_path: PathBuf,
652    pub multi_config: MultiCopperConfig,
653    pub catalog: DistributedReplayCatalog,
654    pub selected_instances: Vec<u32>,
655    pub mission: Option<String>,
656    pub registrations: Vec<DistributedReplayAppRegistration>,
657    pub assignments: Vec<DistributedReplayAssignment>,
658}
659
660impl DistributedReplayPlan {
661    #[inline]
662    pub fn builder(multi_config_path: impl AsRef<Path>) -> CuResult<DistributedReplayBuilder> {
663        DistributedReplayBuilder::new(multi_config_path)
664    }
665
666    #[inline]
667    pub fn assignment(
668        &self,
669        instance_id: u32,
670        subsystem_id: &str,
671    ) -> Option<&DistributedReplayAssignment> {
672        self.assignments.iter().find(|assignment| {
673            assignment.instance_id == instance_id && assignment.subsystem_id == subsystem_id
674        })
675    }
676
677    /// Build a causal distributed replay engine from this validated plan.
678    pub fn start(self) -> CuResult<DistributedReplayEngine> {
679        DistributedReplayEngine::new(self, DistributedReplaySessionConfig::default())
680    }
681
682    /// Build a causal distributed replay engine and persist replayed logs under `output_root`.
683    pub fn start_recording_logs_under(
684        self,
685        output_root: impl AsRef<Path>,
686    ) -> CuResult<DistributedReplayEngine> {
687        DistributedReplayEngine::new(
688            self,
689            DistributedReplaySessionConfig {
690                output_root: Some(output_root.as_ref().to_path_buf()),
691            },
692        )
693    }
694}
695
696/// Aggregated validation diagnostics emitted while constructing a distributed replay plan.
697#[derive(Debug, Clone, Default)]
698pub struct DistributedReplayValidationError {
699    pub issues: Vec<String>,
700}
701
702impl DistributedReplayValidationError {
703    fn push(&mut self, issue: impl Into<String>) {
704        self.issues.push(issue.into());
705    }
706
707    fn is_empty(&self) -> bool {
708        self.issues.is_empty()
709    }
710}
711
712impl Display for DistributedReplayValidationError {
713    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
714        writeln!(f, "Distributed replay validation failed:")?;
715        for issue in &self.issues {
716            writeln!(f, " - {issue}")?;
717        }
718        Ok(())
719    }
720}
721
722/// Builder for a validated distributed replay plan.
723#[derive(Debug, Clone)]
724pub struct DistributedReplayBuilder {
725    multi_config_path: PathBuf,
726    multi_config: MultiCopperConfig,
727    discovery_inputs: Vec<PathBuf>,
728    catalog: Option<DistributedReplayCatalog>,
729    registrations: BTreeMap<String, DistributedReplayAppRegistration>,
730    selected_instances: Option<BTreeSet<u32>>,
731}
732
733impl DistributedReplayBuilder {
734    /// Load a strict multi-Copper config and start building a distributed replay plan.
735    pub fn new(multi_config_path: impl AsRef<Path>) -> CuResult<Self> {
736        let multi_config_path = multi_config_path.as_ref().to_path_buf();
737        let multi_config = read_multi_configuration(&multi_config_path.to_string_lossy())?;
738        Ok(Self {
739            multi_config_path,
740            multi_config,
741            discovery_inputs: Vec::new(),
742            catalog: None,
743            registrations: BTreeMap::new(),
744            selected_instances: None,
745        })
746    }
747
748    /// Replace the discovered catalog explicitly.
749    pub fn with_catalog(mut self, catalog: DistributedReplayCatalog) -> Self {
750        self.catalog = Some(catalog);
751        self
752    }
753
754    /// Discover logs from files and/or directories.
755    ///
756    /// Directories are walked recursively by [`DistributedReplayCatalog`].
757    pub fn discover_logs<I, P>(mut self, inputs: I) -> CuResult<Self>
758    where
759        I: IntoIterator<Item = P>,
760        P: AsRef<Path>,
761    {
762        self.discovery_inputs
763            .extend(inputs.into_iter().map(|path| path.as_ref().to_path_buf()));
764        self.catalog = Some(DistributedReplayCatalog::discover(
765            self.discovery_inputs.iter().collect::<Vec<_>>(),
766        )?);
767        Ok(self)
768    }
769
770    /// Convenience wrapper for recursive discovery under one root directory.
771    pub fn discover_logs_under(self, root: impl AsRef<Path>) -> CuResult<Self> {
772        self.discover_logs([root.as_ref().to_path_buf()])
773    }
774
775    /// Restrict plan construction to a subset of instance ids.
776    pub fn instances<I>(mut self, instances: I) -> Self
777    where
778        I: IntoIterator<Item = u32>,
779    {
780        self.selected_instances = Some(instances.into_iter().collect());
781        self
782    }
783
784    /// Register the generated app type expected for one subsystem.
785    pub fn register<App>(mut self, subsystem_id: &str) -> CuResult<Self>
786    where
787        App: CuDistributedReplayApplication<NoopSectionStorage, NoopLogger>
788            + CuDistributedReplayApplication<MmapSectionStorage, UnifiedLoggerWrite>
789            + 'static,
790    {
791        if self.registrations.contains_key(subsystem_id) {
792            return Err(CuError::from(format!(
793                "Subsystem '{}' is already registered for distributed replay",
794                subsystem_id
795            )));
796        }
797
798        let expected_subsystem = self.multi_config.subsystem(subsystem_id).ok_or_else(|| {
799            CuError::from(format!(
800                "Multi-Copper config '{}' does not define subsystem '{}'",
801                self.multi_config_path.display(),
802                subsystem_id
803            ))
804        })?;
805
806        let registered_subsystem = App::subsystem();
807        let Some(registered_subsystem_id) = registered_subsystem.id() else {
808            return Err(CuError::from(format!(
809                "App type '{}' was not generated for a multi-Copper subsystem and cannot be registered for distributed replay",
810                type_name::<App>()
811            )));
812        };
813
814        if registered_subsystem_id != subsystem_id {
815            return Err(CuError::from(format!(
816                "App type '{}' declares subsystem '{}' but was registered as '{}'",
817                type_name::<App>(),
818                registered_subsystem_id,
819                subsystem_id
820            )));
821        }
822
823        let registered_subsystem_code = registered_subsystem.code();
824        if registered_subsystem_code != expected_subsystem.subsystem_code {
825            return Err(CuError::from(format!(
826                "App type '{}' declares subsystem code {} for '{}' but multi-Copper config '{}' expects {}",
827                type_name::<App>(),
828                registered_subsystem_code,
829                subsystem_id,
830                self.multi_config_path.display(),
831                expected_subsystem.subsystem_code
832            )));
833        }
834
835        self.registrations.insert(
836            subsystem_id.to_string(),
837            DistributedReplayAppRegistration {
838                subsystem: registered_subsystem,
839                app_type_name: type_name::<App>(),
840                session_factory: build_distributed_replay_session::<App>,
841            },
842        );
843        Ok(self)
844    }
845
846    /// Validate discovery + registrations and prepare a typed replay plan.
847    pub fn build(self) -> CuResult<DistributedReplayPlan> {
848        let catalog = match self.catalog {
849            Some(catalog) => catalog,
850            None if self.discovery_inputs.is_empty() => DistributedReplayCatalog::default(),
851            None => DistributedReplayCatalog::discover(
852                self.discovery_inputs.iter().collect::<Vec<_>>(),
853            )?,
854        };
855
856        let mut validation = DistributedReplayValidationError::default();
857
858        for failure in &catalog.failures {
859            validation.push(format!(
860                "discovery failure for '{}': {}",
861                failure.candidate_path.display(),
862                failure.error
863            ));
864        }
865
866        let subsystem_map: BTreeMap<_, _> = self
867            .multi_config
868            .subsystems
869            .iter()
870            .map(|subsystem| (subsystem.id.clone(), subsystem))
871            .collect();
872
873        for subsystem in subsystem_map.keys() {
874            if !self.registrations.contains_key(subsystem) {
875                validation.push(format!(
876                    "missing app registration for subsystem '{}'",
877                    subsystem
878                ));
879            }
880        }
881
882        let mut discovered_instances = BTreeSet::new();
883        let mut logs_by_target: BTreeMap<(u32, String), Vec<DistributedReplayLog>> =
884            BTreeMap::new();
885
886        for log in &catalog.logs {
887            let Some(subsystem_id) = log.subsystem_id() else {
888                validation.push(format!(
889                    "discovered log '{}' is missing subsystem_id runtime metadata",
890                    log.base_path.display()
891                ));
892                continue;
893            };
894
895            let Some(expected_subsystem) = subsystem_map.get(subsystem_id) else {
896                validation.push(format!(
897                    "discovered log '{}' belongs to subsystem '{}' which is not present in multi-Copper config '{}'",
898                    log.base_path.display(),
899                    subsystem_id,
900                    self.multi_config_path.display()
901                ));
902                continue;
903            };
904
905            if log.subsystem_code() != expected_subsystem.subsystem_code {
906                validation.push(format!(
907                    "discovered log '{}' reports subsystem code {} for '{}' but multi-Copper config '{}' expects {}",
908                    log.base_path.display(),
909                    log.subsystem_code(),
910                    subsystem_id,
911                    self.multi_config_path.display(),
912                    expected_subsystem.subsystem_code
913                ));
914            }
915
916            discovered_instances.insert(log.instance_id());
917            logs_by_target
918                .entry((log.instance_id(), subsystem_id.to_string()))
919                .or_default()
920                .push(log.clone());
921        }
922
923        for ((instance_id, subsystem_id), logs) in &logs_by_target {
924            if logs.len() > 1 {
925                validation.push(format!(
926                    "found {} logs for instance {} subsystem '{}': {}",
927                    logs.len(),
928                    instance_id,
929                    subsystem_id,
930                    join_log_paths(logs)
931                ));
932            }
933        }
934
935        let selected_instances: Vec<u32> =
936            if let Some(selected_instances) = &self.selected_instances {
937                let mut selected_instances: Vec<_> = selected_instances.iter().copied().collect();
938                selected_instances.sort_unstable();
939                for instance_id in &selected_instances {
940                    if !discovered_instances.contains(instance_id) {
941                        validation.push(format!(
942                            "selected instance {} has no discovered logs",
943                            instance_id
944                        ));
945                    }
946                }
947                selected_instances
948            } else {
949                discovered_instances.iter().copied().collect()
950            };
951
952        if selected_instances.is_empty() {
953            validation.push("no instances selected for distributed replay");
954        }
955
956        for instance_id in &selected_instances {
957            for subsystem in &self.multi_config.subsystems {
958                if !logs_by_target.contains_key(&(*instance_id, subsystem.id.clone())) {
959                    validation.push(format!(
960                        "missing log for instance {} subsystem '{}'",
961                        instance_id, subsystem.id
962                    ));
963                }
964            }
965        }
966
967        let mut known_missions = BTreeSet::new();
968        for instance_id in &selected_instances {
969            for subsystem in &self.multi_config.subsystems {
970                if let Some(logs) = logs_by_target.get(&(*instance_id, subsystem.id.clone()))
971                    && let Some(log) = logs.first()
972                    && let Some(mission) = &log.mission
973                {
974                    known_missions.insert(mission.clone());
975                }
976            }
977        }
978        if known_missions.len() > 1 {
979            validation.push(format!(
980                "selected logs disagree on mission: {}",
981                known_missions.into_iter().collect::<Vec<_>>().join(", ")
982            ));
983        }
984
985        if !validation.is_empty() {
986            return Err(CuError::from(validation.to_string()));
987        }
988
989        let mission = selected_instances
990            .iter()
991            .flat_map(|instance_id| {
992                self.multi_config.subsystems.iter().filter_map(|subsystem| {
993                    logs_by_target
994                        .get(&(*instance_id, subsystem.id.clone()))
995                        .and_then(|logs| logs.first())
996                        .and_then(|log| log.mission.clone())
997                })
998            })
999            .next();
1000
1001        let mut registrations: Vec<_> = self.registrations.into_values().collect();
1002        registrations.sort_by(|left, right| left.subsystem.id().cmp(&right.subsystem.id()));
1003
1004        let mut assignments = Vec::new();
1005        for instance_id in &selected_instances {
1006            for subsystem in &self.multi_config.subsystems {
1007                let log = logs_by_target
1008                    .get(&(*instance_id, subsystem.id.clone()))
1009                    .and_then(|logs| logs.first())
1010                    .expect("validated distributed replay plan is missing a log")
1011                    .clone();
1012                let registration = registrations
1013                    .iter()
1014                    .find(|registration| registration.subsystem.id() == Some(subsystem.id.as_str()))
1015                    .expect("validated distributed replay plan is missing a registration")
1016                    .clone();
1017                assignments.push(DistributedReplayAssignment {
1018                    instance_id: *instance_id,
1019                    subsystem_id: subsystem.id.clone(),
1020                    log,
1021                    registration,
1022                });
1023            }
1024        }
1025        assignments.sort_by(|left, right| {
1026            (
1027                left.instance_id,
1028                left.registration.subsystem.code(),
1029                left.subsystem_id.as_str(),
1030            )
1031                .cmp(&(
1032                    right.instance_id,
1033                    right.registration.subsystem.code(),
1034                    right.subsystem_id.as_str(),
1035                ))
1036        });
1037
1038        Ok(DistributedReplayPlan {
1039            multi_config_path: self.multi_config_path,
1040            multi_config: self.multi_config,
1041            catalog,
1042            selected_instances,
1043            mission,
1044            registrations,
1045            assignments,
1046        })
1047    }
1048}
1049
1050fn build_distributed_replay_session<App>(
1051    assignment: &DistributedReplayAssignment,
1052    session_config: &DistributedReplaySessionConfig,
1053) -> CuResult<DistributedReplaySessionBuild>
1054where
1055    App: CuDistributedReplayApplication<NoopSectionStorage, NoopLogger>
1056        + CuDistributedReplayApplication<MmapSectionStorage, UnifiedLoggerWrite>
1057        + 'static,
1058{
1059    let config = read_configuration_str(assignment.log.effective_config_ron.clone(), None)
1060        .map_err(|err| {
1061            CuError::from(format!(
1062                "Failed to parse recorded effective config from '{}': {err}",
1063                assignment.log.base_path.display()
1064            ))
1065        })?;
1066    let (clock, clock_mock) = RobotClock::mock();
1067
1068    if let Some(output_root) = &session_config.output_root {
1069        let output_log_path = replay_output_log_path(output_root, assignment)?;
1070        let logger = build_replay_output_logger(
1071            &output_log_path,
1072            replay_output_log_size_bytes(assignment, &config),
1073        )?;
1074        let app = <App as CuDistributedReplayApplication<
1075            MmapSectionStorage,
1076            UnifiedLoggerWrite,
1077        >>::build_distributed_replay(
1078            clock.clone(), logger, assignment.instance_id, Some(config)
1079        )?;
1080        let mut session = RecordedReplaySession::<
1081            App,
1082            <App as CuRecordedReplayApplication<
1083                MmapSectionStorage,
1084                UnifiedLoggerWrite,
1085            >>::RecordedDataSet,
1086            MmapSectionStorage,
1087            UnifiedLoggerWrite,
1088        >::from_log(assignment.clone(), app, clock_mock, &assignment.log.base_path)?;
1089        let nodes = session.describe_nodes()?;
1090        return Ok(DistributedReplaySessionBuild {
1091            session: Box::new(session),
1092            nodes,
1093            output_log_path: Some(output_log_path),
1094        });
1095    }
1096
1097    let logger = Arc::new(Mutex::new(NoopLogger::new()));
1098    let app = <App as CuDistributedReplayApplication<NoopSectionStorage, NoopLogger>>::build_distributed_replay(
1099        clock,
1100        logger,
1101        assignment.instance_id,
1102        Some(config),
1103    )?;
1104    let mut session = RecordedReplaySession::<
1105        App,
1106        <App as CuRecordedReplayApplication<NoopSectionStorage, NoopLogger>>::RecordedDataSet,
1107        NoopSectionStorage,
1108        NoopLogger,
1109    >::from_log(
1110        assignment.clone(),
1111        app,
1112        clock_mock,
1113        &assignment.log.base_path,
1114    )?;
1115    let nodes = session.describe_nodes()?;
1116    Ok(DistributedReplaySessionBuild {
1117        session: Box::new(session),
1118        nodes,
1119        output_log_path: None,
1120    })
1121}
1122
1123fn replay_output_log_path(
1124    output_root: &Path,
1125    assignment: &DistributedReplayAssignment,
1126) -> CuResult<PathBuf> {
1127    let file_name = assignment
1128        .log
1129        .base_path
1130        .file_name()
1131        .ok_or_else(|| {
1132            CuError::from(format!(
1133                "Replay assignment log '{}' has no file name",
1134                assignment.log.base_path.display()
1135            ))
1136        })?
1137        .to_owned();
1138    Ok(output_root.join(file_name))
1139}
1140
1141fn build_replay_output_logger(
1142    path: &Path,
1143    preallocated_size: usize,
1144) -> CuResult<Arc<Mutex<UnifiedLoggerWrite>>> {
1145    if let Some(parent) = path.parent() {
1146        fs::create_dir_all(parent).map_err(|err| {
1147            CuError::new_with_cause(
1148                &format!(
1149                    "Failed to create replay log directory '{}'",
1150                    parent.display()
1151                ),
1152                err,
1153            )
1154        })?;
1155    }
1156    let UnifiedLogger::Write(writer) = UnifiedLoggerBuilder::new()
1157        .write(true)
1158        .create(true)
1159        .file_base_name(path)
1160        .preallocated_size(preallocated_size)
1161        .build()
1162        .map_err(|err| {
1163            CuError::new_with_cause(
1164                &format!("Failed to create replay log '{}'", path.display()),
1165                err,
1166            )
1167        })?
1168    else {
1169        return Err(CuError::from(format!(
1170            "Expected writable replay logger for '{}'",
1171            path.display()
1172        )));
1173    };
1174    Ok(Arc::new(Mutex::new(writer)))
1175}
1176
1177fn replay_output_log_size_bytes(
1178    assignment: &DistributedReplayAssignment,
1179    config: &crate::config::CuConfig,
1180) -> usize {
1181    if let Some(slab_zero) = slab_zero_path(&assignment.log.base_path)
1182        && let Ok(metadata) = fs::metadata(slab_zero)
1183        && let Ok(size) = usize::try_from(metadata.len())
1184    {
1185        return size.max(DEFAULT_REPLAY_LOG_SIZE_BYTES);
1186    }
1187
1188    config
1189        .logging
1190        .as_ref()
1191        .and_then(|logging| logging.slab_size_mib)
1192        .and_then(|size_mib| usize::try_from(size_mib).ok())
1193        .and_then(|size_mib| size_mib.checked_mul(1024 * 1024))
1194        .unwrap_or(DEFAULT_REPLAY_LOG_SIZE_BYTES)
1195}
1196
1197fn copperlist_origins<P: CopperListTuple>(
1198    copperlist: &CopperList<P>,
1199) -> BTreeSet<DistributedReplayOriginKey> {
1200    <CopperList<P> as ErasedCuStampedDataSet>::cumsgs(copperlist)
1201        .into_iter()
1202        .filter_map(|msg| msg.metadata().origin())
1203        .map(|origin| DistributedReplayOriginKey {
1204            instance_id: origin.instance_id,
1205            subsystem_code: origin.subsystem_code,
1206            cl_id: origin.cl_id,
1207        })
1208        .collect()
1209}
1210
1211#[derive(Default)]
1212struct DistributedReplayEngineState {
1213    sessions: Vec<Box<dyn DistributedReplaySession>>,
1214    nodes: Vec<DistributedReplayGraphNode>,
1215    node_lookup: BTreeMap<(u32, String, u64), usize>,
1216    output_log_paths: BTreeMap<(u32, String), PathBuf>,
1217    ready: BTreeSet<DistributedReplayReadyNode>,
1218    frontier: Vec<Option<DistributedReplayCursor>>,
1219}
1220
1221/// One causal distributed replay engine built from a validated plan.
1222pub struct DistributedReplayEngine {
1223    plan: DistributedReplayPlan,
1224    session_config: DistributedReplaySessionConfig,
1225    sessions: Vec<Box<dyn DistributedReplaySession>>,
1226    nodes: Vec<DistributedReplayGraphNode>,
1227    node_lookup: BTreeMap<(u32, String, u64), usize>,
1228    output_log_paths: BTreeMap<(u32, String), PathBuf>,
1229    ready: BTreeSet<DistributedReplayReadyNode>,
1230    frontier: Vec<Option<DistributedReplayCursor>>,
1231    executed: Vec<bool>,
1232    executed_count: usize,
1233}
1234
1235impl DistributedReplayEngine {
1236    fn new(
1237        plan: DistributedReplayPlan,
1238        session_config: DistributedReplaySessionConfig,
1239    ) -> CuResult<Self> {
1240        let state = Self::build_state(&plan, &session_config)?;
1241        let executed = vec![false; state.nodes.len()];
1242        Ok(Self {
1243            plan,
1244            session_config,
1245            sessions: state.sessions,
1246            nodes: state.nodes,
1247            node_lookup: state.node_lookup,
1248            output_log_paths: state.output_log_paths,
1249            ready: state.ready,
1250            frontier: state.frontier,
1251            executed,
1252            executed_count: 0,
1253        })
1254    }
1255
1256    fn build_state(
1257        plan: &DistributedReplayPlan,
1258        session_config: &DistributedReplaySessionConfig,
1259    ) -> CuResult<DistributedReplayEngineState> {
1260        let mut sessions = Vec::with_capacity(plan.assignments.len());
1261        let mut pending_nodes = Vec::new();
1262        let mut session_nodes = Vec::with_capacity(plan.assignments.len());
1263        let mut output_log_paths = BTreeMap::new();
1264
1265        for assignment in &plan.assignments {
1266            let build = (assignment.registration.session_factory)(assignment, session_config)?;
1267            let session_index = sessions.len();
1268            let mut node_indices = Vec::with_capacity(build.nodes.len());
1269            for node in build.nodes {
1270                let pending_index = pending_nodes.len();
1271                pending_nodes.push((session_index, node));
1272                node_indices.push(pending_index);
1273            }
1274            if let Some(output_log_path) = build.output_log_path {
1275                let replaced = output_log_paths.insert(
1276                    (assignment.instance_id, assignment.subsystem_id.clone()),
1277                    output_log_path,
1278                );
1279                if replaced.is_some() {
1280                    return Err(CuError::from(format!(
1281                        "Duplicate replay output log assignment for instance {} subsystem '{}'",
1282                        assignment.instance_id, assignment.subsystem_id
1283                    )));
1284                }
1285            }
1286            sessions.push(build.session);
1287            session_nodes.push(node_indices);
1288        }
1289
1290        let mut nodes = Vec::with_capacity(pending_nodes.len());
1291        let mut origin_lookup = BTreeMap::new();
1292        let mut node_lookup = BTreeMap::new();
1293
1294        for (node_index, (session_index, descriptor)) in pending_nodes.iter().enumerate() {
1295            if origin_lookup
1296                .insert(descriptor.origin_key.clone(), node_index)
1297                .is_some()
1298            {
1299                return Err(CuError::from(format!(
1300                    "Duplicate replay node detected for instance {} subsystem code {} CopperList {}",
1301                    descriptor.origin_key.instance_id,
1302                    descriptor.origin_key.subsystem_code,
1303                    descriptor.origin_key.cl_id
1304                )));
1305            }
1306
1307            if node_lookup
1308                .insert(
1309                    (
1310                        descriptor.cursor.instance_id,
1311                        descriptor.cursor.subsystem_id.clone(),
1312                        descriptor.cursor.cl_id,
1313                    ),
1314                    node_index,
1315                )
1316                .is_some()
1317            {
1318                return Err(CuError::from(format!(
1319                    "Duplicate replay cursor detected for instance {} subsystem '{}' CopperList {}",
1320                    descriptor.cursor.instance_id,
1321                    descriptor.cursor.subsystem_id,
1322                    descriptor.cursor.cl_id
1323                )));
1324            }
1325
1326            nodes.push(DistributedReplayGraphNode {
1327                cursor: descriptor.cursor.clone(),
1328                session_index: *session_index,
1329                outgoing: Vec::new(),
1330                initial_dependencies: 0,
1331                remaining_dependencies: 0,
1332            });
1333        }
1334
1335        let mut edges = BTreeSet::new();
1336
1337        for node_indices in &session_nodes {
1338            for pair in node_indices.windows(2) {
1339                let from = pair[0];
1340                let to = pair[1];
1341                if edges.insert((from, to)) {
1342                    nodes[from].outgoing.push(to);
1343                    nodes[to].initial_dependencies += 1;
1344                }
1345            }
1346        }
1347
1348        for (target_index, (_, descriptor)) in pending_nodes.iter().enumerate() {
1349            for origin in &descriptor.incoming_origins {
1350                let source_index = origin_lookup.get(origin).copied().ok_or_else(|| {
1351                    CuError::from(format!(
1352                        "Unresolved recorded provenance edge into instance {} subsystem '{}' CopperList {} from instance {} subsystem code {} CopperList {}",
1353                        descriptor.cursor.instance_id,
1354                        descriptor.cursor.subsystem_id,
1355                        descriptor.cursor.cl_id,
1356                        origin.instance_id,
1357                        origin.subsystem_code,
1358                        origin.cl_id
1359                    ))
1360                })?;
1361                if source_index == target_index {
1362                    return Err(CuError::from(format!(
1363                        "Recorded provenance on instance {} subsystem '{}' CopperList {} points to itself",
1364                        descriptor.cursor.instance_id,
1365                        descriptor.cursor.subsystem_id,
1366                        descriptor.cursor.cl_id
1367                    )));
1368                }
1369                if edges.insert((source_index, target_index)) {
1370                    nodes[source_index].outgoing.push(target_index);
1371                    nodes[target_index].initial_dependencies += 1;
1372                }
1373            }
1374        }
1375
1376        let mut ready = BTreeSet::new();
1377        for (node_index, node) in nodes.iter_mut().enumerate() {
1378            node.remaining_dependencies = node.initial_dependencies;
1379            if node.remaining_dependencies == 0 {
1380                ready.insert(DistributedReplayReadyNode {
1381                    instance_id: node.cursor.instance_id,
1382                    subsystem_code: node.cursor.subsystem_code(),
1383                    cl_id: node.cursor.cl_id,
1384                    node_index,
1385                });
1386            }
1387        }
1388
1389        if !nodes.is_empty() && ready.is_empty() {
1390            return Err(CuError::from(
1391                "Distributed replay graph has no causally ready starting point",
1392            ));
1393        }
1394
1395        Ok(DistributedReplayEngineState {
1396            frontier: vec![None; sessions.len()],
1397            sessions,
1398            nodes,
1399            node_lookup,
1400            output_log_paths,
1401            ready,
1402        })
1403    }
1404
1405    fn shutdown_sessions(sessions: &mut Vec<Box<dyn DistributedReplaySession>>) -> CuResult<()> {
1406        for session in sessions.iter_mut() {
1407            session.shutdown()?;
1408        }
1409        Ok(())
1410    }
1411
1412    fn ready_key(&self, node_index: usize) -> DistributedReplayReadyNode {
1413        let node = &self.nodes[node_index];
1414        DistributedReplayReadyNode {
1415            instance_id: node.cursor.instance_id,
1416            subsystem_code: node.cursor.subsystem_code(),
1417            cl_id: node.cursor.cl_id,
1418            node_index,
1419        }
1420    }
1421
1422    /// Reset all replay sessions and graph execution state back to the beginning.
1423    pub fn reset(&mut self) -> CuResult<()> {
1424        Self::shutdown_sessions(&mut self.sessions)?;
1425        let state = Self::build_state(&self.plan, &self.session_config)?;
1426        self.sessions = state.sessions;
1427        self.nodes = state.nodes;
1428        self.node_lookup = state.node_lookup;
1429        self.output_log_paths = state.output_log_paths;
1430        self.ready = state.ready;
1431        self.frontier = state.frontier;
1432        self.executed = vec![false; self.nodes.len()];
1433        self.executed_count = 0;
1434        Ok(())
1435    }
1436
1437    /// Replay the next causally ready CopperList, if any.
1438    pub fn step_causal(&mut self) -> CuResult<Option<DistributedReplayCursor>> {
1439        let Some(next_ready) = self.ready.iter().next().copied() else {
1440            if self.executed_count == self.nodes.len() {
1441                return Ok(None);
1442            }
1443            return Err(CuError::from(
1444                "Distributed replay is deadlocked: no causally ready CopperList remains",
1445            ));
1446        };
1447        self.ready.remove(&next_ready);
1448
1449        let cursor = self.nodes[next_ready.node_index].cursor.clone();
1450        let session_index = self.nodes[next_ready.node_index].session_index;
1451        self.sessions[session_index].goto_cl(cursor.cl_id)?;
1452        self.executed[next_ready.node_index] = true;
1453        self.executed_count += 1;
1454        self.frontier[session_index] = Some(cursor.clone());
1455
1456        let outgoing = self.nodes[next_ready.node_index].outgoing.clone();
1457        for dependent in outgoing {
1458            let node = &mut self.nodes[dependent];
1459            node.remaining_dependencies = node.remaining_dependencies.saturating_sub(1);
1460            if node.remaining_dependencies == 0 {
1461                self.ready.insert(self.ready_key(dependent));
1462            }
1463        }
1464
1465        Ok(Some(cursor))
1466    }
1467
1468    /// Replay the entire selected fleet to completion.
1469    pub fn run_all(&mut self) -> CuResult<()> {
1470        while self.step_causal()?.is_some() {}
1471        Ok(())
1472    }
1473
1474    /// Rebuild the replay from scratch and advance until the target CopperList is reached.
1475    pub fn goto(&mut self, instance_id: u32, subsystem_id: &str, cl_id: u64) -> CuResult<()> {
1476        let target = self
1477            .node_lookup
1478            .get(&(instance_id, subsystem_id.to_string(), cl_id))
1479            .copied()
1480            .ok_or_else(|| {
1481                CuError::from(format!(
1482                    "Distributed replay target instance {} subsystem '{}' CopperList {} does not exist",
1483                    instance_id, subsystem_id, cl_id
1484                ))
1485            })?;
1486        self.reset()?;
1487        while !self.executed[target] {
1488            let Some(_) = self.step_causal()? else {
1489                return Err(CuError::from(format!(
1490                    "Distributed replay exhausted before reaching instance {} subsystem '{}' CopperList {}",
1491                    instance_id, subsystem_id, cl_id
1492                )));
1493            };
1494        }
1495        Ok(())
1496    }
1497
1498    /// Return the latest executed CopperList cursor for each replay session.
1499    pub fn current_frontier(&self) -> Vec<DistributedReplayCursor> {
1500        self.frontier
1501            .iter()
1502            .filter_map(|cursor| cursor.clone())
1503            .collect()
1504    }
1505
1506    pub fn output_log_path(&self, instance_id: u32, subsystem_id: &str) -> Option<&Path> {
1507        self.output_log_paths
1508            .get(&(instance_id, subsystem_id.to_string()))
1509            .map(PathBuf::as_path)
1510    }
1511
1512    #[inline]
1513    pub fn total_nodes(&self) -> usize {
1514        self.nodes.len()
1515    }
1516
1517    #[inline]
1518    pub fn executed_nodes(&self) -> usize {
1519        self.executed_count
1520    }
1521}
1522
1523fn join_log_paths(logs: &[DistributedReplayLog]) -> String {
1524    logs.iter()
1525        .map(|log| log.base_path.display().to_string())
1526        .collect::<Vec<_>>()
1527        .join(", ")
1528}
1529
1530fn collect_candidate_base_paths(path: &Path, out: &mut BTreeSet<PathBuf>) -> CuResult<()> {
1531    if path.is_dir() {
1532        let mut entries = fs::read_dir(path)
1533            .map_err(|err| {
1534                CuError::new_with_cause(
1535                    &format!(
1536                        "Failed to read directory '{}' during distributed replay discovery",
1537                        path.display()
1538                    ),
1539                    err,
1540                )
1541            })?
1542            .collect::<Result<Vec<_>, _>>()
1543            .map_err(|err| {
1544                CuError::new_with_cause(
1545                    &format!(
1546                        "Failed to enumerate directory '{}' during distributed replay discovery",
1547                        path.display()
1548                    ),
1549                    err,
1550                )
1551            })?;
1552        entries.sort_by_key(|entry| entry.path());
1553        for entry in entries {
1554            collect_candidate_base_paths(&entry.path(), out)?;
1555        }
1556        return Ok(());
1557    }
1558
1559    if path
1560        .extension()
1561        .and_then(|ext| ext.to_str())
1562        .is_some_and(|ext| ext == "copper")
1563    {
1564        out.insert(normalize_candidate_log_base(path));
1565    }
1566
1567    Ok(())
1568}
1569
1570fn normalize_candidate_log_base(path: &Path) -> PathBuf {
1571    let Some(extension) = path.extension().and_then(|ext| ext.to_str()) else {
1572        return path.to_path_buf();
1573    };
1574    let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
1575        return path.to_path_buf();
1576    };
1577    let Some((base_stem, slab_suffix)) = stem.rsplit_once('_') else {
1578        return path.to_path_buf();
1579    };
1580
1581    if slab_suffix.is_empty() || !slab_suffix.chars().all(|c| c.is_ascii_digit()) {
1582        return path.to_path_buf();
1583    }
1584
1585    let mut normalized = path.to_path_buf();
1586    normalized.set_file_name(format!("{base_stem}.{extension}"));
1587    if slab_zero_path(&normalized).is_some_and(|slab_zero| slab_zero.exists()) {
1588        normalized
1589    } else {
1590        path.to_path_buf()
1591    }
1592}
1593
1594fn slab_zero_path(base_path: &Path) -> Option<PathBuf> {
1595    let extension = base_path.extension()?.to_str()?;
1596    let stem = base_path.file_stem()?.to_str()?;
1597    let mut slab_zero = base_path.to_path_buf();
1598    slab_zero.set_file_name(format!("{stem}_0.{extension}"));
1599    Some(slab_zero)
1600}
1601
1602fn read_next_entry<T: bincode::Decode<()>>(src: &mut impl Read) -> CuResult<Option<T>> {
1603    match decode_from_std_read::<T, _, _>(src, standard()) {
1604        Ok(entry) => Ok(Some(entry)),
1605        Err(DecodeError::UnexpectedEnd { .. }) => Ok(None),
1606        Err(DecodeError::Io { inner, .. }) if inner.kind() == std::io::ErrorKind::UnexpectedEof => {
1607            Ok(None)
1608        }
1609        Err(err) => Err(CuError::new_with_cause(
1610            "Failed to decode bincode entry during distributed replay discovery",
1611            err,
1612        )),
1613    }
1614}
1615
1616#[cfg(test)]
1617mod tests {
1618    use super::*;
1619    use crate::app::{
1620        CuDistributedReplayApplication, CuRecordedReplayApplication, CuSimApplication,
1621        CuSubsystemMetadata,
1622    };
1623    use crate::config::CuConfig;
1624    use crate::copperlist::CopperList;
1625    use crate::curuntime::KeyFrame;
1626    use crate::simulation::SimOverride;
1627    use bincode::{Decode, Encode};
1628    use cu29_clock::CuTime;
1629    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks, WriteStream};
1630    use cu29_unifiedlog::memmap::MmapSectionStorage;
1631    use cu29_unifiedlog::stream_write;
1632    use serde::Serialize;
1633    use std::sync::{Arc, Mutex};
1634    use tempfile::TempDir;
1635
1636    fn write_runtime_lifecycle_log(
1637        base_path: &Path,
1638        stack: RuntimeLifecycleStackInfo,
1639        mission: Option<&str>,
1640    ) -> CuResult<()> {
1641        if let Some(parent) = base_path.parent() {
1642            fs::create_dir_all(parent).map_err(|err| {
1643                CuError::new_with_cause(
1644                    &format!("Failed to create test log directory '{}'", parent.display()),
1645                    err,
1646                )
1647            })?;
1648        }
1649
1650        let UnifiedLogger::Write(writer) = UnifiedLoggerBuilder::new()
1651            .write(true)
1652            .create(true)
1653            .preallocated_size(256 * 1024)
1654            .file_base_name(base_path)
1655            .build()
1656            .map_err(|err| {
1657                CuError::new_with_cause(
1658                    &format!("Failed to create test log '{}'", base_path.display()),
1659                    err,
1660                )
1661            })?
1662        else {
1663            return Err(CuError::from("Expected writable unified logger in test"));
1664        };
1665
1666        let logger = Arc::new(Mutex::new(writer));
1667        let mut stream = stream_write::<RuntimeLifecycleRecord, MmapSectionStorage>(
1668            logger.clone(),
1669            UnifiedLogType::RuntimeLifecycle,
1670            4096,
1671        )?;
1672        stream.log(&RuntimeLifecycleRecord {
1673            timestamp: CuTime::default(),
1674            event: RuntimeLifecycleEvent::Instantiated {
1675                config_source: RuntimeLifecycleConfigSource::ExternalFile,
1676                effective_config_ron: "(runtime: ())".to_string(),
1677                stack,
1678            },
1679        })?;
1680        if let Some(mission) = mission {
1681            stream.log(&RuntimeLifecycleRecord {
1682                timestamp: CuTime::from_nanos(1),
1683                event: RuntimeLifecycleEvent::MissionStarted {
1684                    mission: mission.to_string(),
1685                },
1686            })?;
1687        }
1688        drop(stream);
1689        drop(logger);
1690        Ok(())
1691    }
1692
1693    fn test_stack(
1694        subsystem_id: &str,
1695        subsystem_code: u16,
1696        instance_id: u32,
1697    ) -> RuntimeLifecycleStackInfo {
1698        RuntimeLifecycleStackInfo {
1699            app_name: "demo".to_string(),
1700            app_version: "0.1.0".to_string(),
1701            git_commit: Some("abc123".to_string()),
1702            git_dirty: Some(false),
1703            subsystem_id: Some(subsystem_id.to_string()),
1704            subsystem_code,
1705            instance_id,
1706        }
1707    }
1708
1709    fn write_multi_config_fixture(temp_dir: &TempDir, subsystem_ids: &[&str]) -> CuResult<PathBuf> {
1710        for subsystem_id in subsystem_ids {
1711            let subsystem_config = temp_dir.path().join(format!("{subsystem_id}_config.ron"));
1712            fs::write(&subsystem_config, "(tasks: [], cnx: [])").map_err(|err| {
1713                CuError::new_with_cause(
1714                    &format!(
1715                        "Failed to write subsystem config '{}'",
1716                        subsystem_config.display()
1717                    ),
1718                    err,
1719                )
1720            })?;
1721        }
1722
1723        let subsystem_entries = subsystem_ids
1724            .iter()
1725            .map(|subsystem_id| {
1726                format!(
1727                    r#"(
1728            id: "{subsystem_id}",
1729            config: "{subsystem_id}_config.ron",
1730        )"#
1731                )
1732            })
1733            .collect::<Vec<_>>()
1734            .join(",\n");
1735
1736        let multi_config = format!(
1737            "(\n    subsystems: [\n{entries}\n    ],\n    interconnects: [],\n)\n",
1738            entries = subsystem_entries
1739        );
1740        let multi_config_path = temp_dir.path().join("multi_copper.ron");
1741        fs::write(&multi_config_path, multi_config).map_err(|err| {
1742            CuError::new_with_cause(
1743                &format!(
1744                    "Failed to write multi-Copper config '{}'",
1745                    multi_config_path.display()
1746                ),
1747                err,
1748            )
1749        })?;
1750        Ok(multi_config_path)
1751    }
1752
1753    #[derive(Debug, Default, Encode, Decode, Serialize)]
1754    struct DummyRecordedDataSet;
1755
1756    impl ErasedCuStampedDataSet for DummyRecordedDataSet {
1757        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1758            Vec::new()
1759        }
1760    }
1761
1762    impl MatchingTasks for DummyRecordedDataSet {
1763        fn get_all_task_ids() -> &'static [&'static str] {
1764            &[]
1765        }
1766    }
1767
1768    macro_rules! impl_registered_test_app {
1769        ($name:ident, $subsystem_id:expr, $subsystem_code:expr) => {
1770            struct $name;
1771
1772            impl CuSubsystemMetadata for $name {
1773                fn subsystem() -> Subsystem {
1774                    Subsystem::new(Some($subsystem_id), $subsystem_code)
1775                }
1776            }
1777
1778            impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
1779                CuSimApplication<S, L> for $name
1780            {
1781                type Step<'z> = ();
1782
1783                fn get_original_config() -> String {
1784                    "(tasks: [], cnx: [])".to_string()
1785                }
1786
1787                fn start_all_tasks(
1788                    &mut self,
1789                    _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1790                ) -> CuResult<()> {
1791                    Ok(())
1792                }
1793
1794                fn run_one_iteration(
1795                    &mut self,
1796                    _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1797                ) -> CuResult<()> {
1798                    Ok(())
1799                }
1800
1801                fn run(
1802                    &mut self,
1803                    _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1804                ) -> CuResult<()> {
1805                    Ok(())
1806                }
1807
1808                fn stop_all_tasks(
1809                    &mut self,
1810                    _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1811                ) -> CuResult<()> {
1812                    Ok(())
1813                }
1814
1815                fn restore_keyframe(&mut self, _freezer: &KeyFrame) -> CuResult<()> {
1816                    Ok(())
1817                }
1818            }
1819
1820            impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
1821                CuRecordedReplayApplication<S, L> for $name
1822            {
1823                type RecordedDataSet = DummyRecordedDataSet;
1824
1825                fn replay_recorded_copperlist(
1826                    &mut self,
1827                    _clock_mock: &RobotClockMock,
1828                    _copperlist: &CopperList<Self::RecordedDataSet>,
1829                    _keyframe: Option<&KeyFrame>,
1830                ) -> CuResult<()> {
1831                    Ok(())
1832                }
1833            }
1834
1835            impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
1836                CuDistributedReplayApplication<S, L> for $name
1837            {
1838                fn build_distributed_replay(
1839                    _clock: RobotClock,
1840                    _unified_logger: Arc<Mutex<L>>,
1841                    _instance_id: u32,
1842                    _config_override: Option<CuConfig>,
1843                ) -> CuResult<Self> {
1844                    Ok(Self)
1845                }
1846            }
1847        };
1848    }
1849
1850    impl_registered_test_app!(PingRegisteredApp, "ping", 0);
1851    impl_registered_test_app!(PongRegisteredApp, "pong", 1);
1852    impl_registered_test_app!(PingWrongCodeApp, "ping", 99);
1853
1854    struct FakeReplaySession;
1855
1856    impl DistributedReplaySession for FakeReplaySession {
1857        fn goto_cl(&mut self, _cl_id: u64) -> CuResult<()> {
1858            Ok(())
1859        }
1860
1861        fn shutdown(&mut self) -> CuResult<()> {
1862            Ok(())
1863        }
1864    }
1865
1866    fn fake_registration(
1867        subsystem_id: &'static str,
1868        subsystem_code: u16,
1869        session_factory: DistributedReplaySessionFactory,
1870    ) -> DistributedReplayAppRegistration {
1871        DistributedReplayAppRegistration {
1872            subsystem: Subsystem::new(Some(subsystem_id), subsystem_code),
1873            app_type_name: "fake",
1874            session_factory,
1875        }
1876    }
1877
1878    fn fake_assignment(
1879        instance_id: u32,
1880        subsystem_id: &'static str,
1881        subsystem_code: u16,
1882        session_factory: DistributedReplaySessionFactory,
1883    ) -> DistributedReplayAssignment {
1884        DistributedReplayAssignment {
1885            instance_id,
1886            subsystem_id: subsystem_id.to_string(),
1887            log: DistributedReplayLog {
1888                base_path: PathBuf::from(format!("{subsystem_id}_{instance_id}.copper")),
1889                stack: test_stack(subsystem_id, subsystem_code, instance_id),
1890                config_source: RuntimeLifecycleConfigSource::ExternalFile,
1891                effective_config_ron: "(tasks: [], cnx: [])".to_string(),
1892                mission: Some("default".to_string()),
1893            },
1894            registration: fake_registration(subsystem_id, subsystem_code, session_factory),
1895        }
1896    }
1897
1898    fn fake_plan(assignments: Vec<DistributedReplayAssignment>) -> DistributedReplayPlan {
1899        let mut registrations: Vec<_> = assignments
1900            .iter()
1901            .map(|assignment| assignment.registration.clone())
1902            .collect();
1903        registrations.sort_by(|left, right| left.subsystem.id().cmp(&right.subsystem.id()));
1904        let mut selected_instances: Vec<_> = assignments
1905            .iter()
1906            .map(|assignment| assignment.instance_id)
1907            .collect::<BTreeSet<_>>()
1908            .into_iter()
1909            .collect();
1910        selected_instances.sort_unstable();
1911        DistributedReplayPlan {
1912            multi_config_path: PathBuf::from("fake_multi.ron"),
1913            multi_config: MultiCopperConfig {
1914                subsystems: Vec::new(),
1915                interconnects: Vec::new(),
1916                instance_overrides_root: None,
1917            },
1918            catalog: DistributedReplayCatalog::default(),
1919            selected_instances,
1920            mission: Some("default".to_string()),
1921            registrations,
1922            assignments,
1923        }
1924    }
1925
1926    fn fake_ping_session(
1927        assignment: &DistributedReplayAssignment,
1928        _session_config: &DistributedReplaySessionConfig,
1929    ) -> CuResult<DistributedReplaySessionBuild> {
1930        Ok(DistributedReplaySessionBuild {
1931            session: Box::new(FakeReplaySession),
1932            nodes: vec![
1933                DistributedReplayNodeDescriptor {
1934                    cursor: DistributedReplayCursor::new(
1935                        assignment.instance_id,
1936                        assignment.subsystem_id.clone(),
1937                        assignment.log.subsystem_code(),
1938                        0,
1939                    ),
1940                    origin_key: DistributedReplayOriginKey {
1941                        instance_id: assignment.instance_id,
1942                        subsystem_code: assignment.log.subsystem_code(),
1943                        cl_id: 0,
1944                    },
1945                    incoming_origins: BTreeSet::new(),
1946                },
1947                DistributedReplayNodeDescriptor {
1948                    cursor: DistributedReplayCursor::new(
1949                        assignment.instance_id,
1950                        assignment.subsystem_id.clone(),
1951                        assignment.log.subsystem_code(),
1952                        1,
1953                    ),
1954                    origin_key: DistributedReplayOriginKey {
1955                        instance_id: assignment.instance_id,
1956                        subsystem_code: assignment.log.subsystem_code(),
1957                        cl_id: 1,
1958                    },
1959                    incoming_origins: BTreeSet::new(),
1960                },
1961            ],
1962            output_log_path: None,
1963        })
1964    }
1965
1966    fn fake_pong_session(
1967        assignment: &DistributedReplayAssignment,
1968        _session_config: &DistributedReplaySessionConfig,
1969    ) -> CuResult<DistributedReplaySessionBuild> {
1970        Ok(DistributedReplaySessionBuild {
1971            session: Box::new(FakeReplaySession),
1972            nodes: vec![
1973                DistributedReplayNodeDescriptor {
1974                    cursor: DistributedReplayCursor::new(
1975                        assignment.instance_id,
1976                        assignment.subsystem_id.clone(),
1977                        assignment.log.subsystem_code(),
1978                        0,
1979                    ),
1980                    origin_key: DistributedReplayOriginKey {
1981                        instance_id: assignment.instance_id,
1982                        subsystem_code: assignment.log.subsystem_code(),
1983                        cl_id: 0,
1984                    },
1985                    incoming_origins: BTreeSet::from([DistributedReplayOriginKey {
1986                        instance_id: assignment.instance_id,
1987                        subsystem_code: 0,
1988                        cl_id: 0,
1989                    }]),
1990                },
1991                DistributedReplayNodeDescriptor {
1992                    cursor: DistributedReplayCursor::new(
1993                        assignment.instance_id,
1994                        assignment.subsystem_id.clone(),
1995                        assignment.log.subsystem_code(),
1996                        1,
1997                    ),
1998                    origin_key: DistributedReplayOriginKey {
1999                        instance_id: assignment.instance_id,
2000                        subsystem_code: assignment.log.subsystem_code(),
2001                        cl_id: 1,
2002                    },
2003                    incoming_origins: BTreeSet::from([DistributedReplayOriginKey {
2004                        instance_id: assignment.instance_id,
2005                        subsystem_code: 0,
2006                        cl_id: 1,
2007                    }]),
2008                },
2009            ],
2010            output_log_path: None,
2011        })
2012    }
2013
2014    fn fake_bad_pong_session(
2015        assignment: &DistributedReplayAssignment,
2016        _session_config: &DistributedReplaySessionConfig,
2017    ) -> CuResult<DistributedReplaySessionBuild> {
2018        Ok(DistributedReplaySessionBuild {
2019            session: Box::new(FakeReplaySession),
2020            nodes: vec![DistributedReplayNodeDescriptor {
2021                cursor: DistributedReplayCursor::new(
2022                    assignment.instance_id,
2023                    assignment.subsystem_id.clone(),
2024                    assignment.log.subsystem_code(),
2025                    0,
2026                ),
2027                origin_key: DistributedReplayOriginKey {
2028                    instance_id: assignment.instance_id,
2029                    subsystem_code: assignment.log.subsystem_code(),
2030                    cl_id: 0,
2031                },
2032                incoming_origins: BTreeSet::from([DistributedReplayOriginKey {
2033                    instance_id: assignment.instance_id,
2034                    subsystem_code: 0,
2035                    cl_id: 99,
2036                }]),
2037            }],
2038            output_log_path: None,
2039        })
2040    }
2041
2042    const STRESS_SUBSYSTEMS: [(&str, u16); 4] =
2043        [("sense", 0), ("plan", 1), ("control", 2), ("telemetry", 3)];
2044
2045    fn stress_origins_for(
2046        subsystem_id: &str,
2047        instance_id: u32,
2048        cl_id: u64,
2049    ) -> BTreeSet<DistributedReplayOriginKey> {
2050        match subsystem_id {
2051            "sense" => BTreeSet::new(),
2052            "plan" => BTreeSet::from([DistributedReplayOriginKey {
2053                instance_id,
2054                subsystem_code: 0,
2055                cl_id,
2056            }]),
2057            "control" => BTreeSet::from([DistributedReplayOriginKey {
2058                instance_id,
2059                subsystem_code: 1,
2060                cl_id,
2061            }]),
2062            "telemetry" => BTreeSet::from([
2063                DistributedReplayOriginKey {
2064                    instance_id,
2065                    subsystem_code: 0,
2066                    cl_id,
2067                },
2068                DistributedReplayOriginKey {
2069                    instance_id,
2070                    subsystem_code: 2,
2071                    cl_id,
2072                },
2073            ]),
2074            _ => panic!("unexpected synthetic stress subsystem '{subsystem_id}'"),
2075        }
2076    }
2077
2078    fn build_stress_session(
2079        assignment: &DistributedReplayAssignment,
2080        _session_config: &DistributedReplaySessionConfig,
2081        cl_count: u64,
2082    ) -> CuResult<DistributedReplaySessionBuild> {
2083        let subsystem_code = assignment.log.subsystem_code();
2084        let nodes = (0..cl_count)
2085            .map(|cl_id| DistributedReplayNodeDescriptor {
2086                cursor: DistributedReplayCursor::new(
2087                    assignment.instance_id,
2088                    assignment.subsystem_id.clone(),
2089                    subsystem_code,
2090                    cl_id,
2091                ),
2092                origin_key: DistributedReplayOriginKey {
2093                    instance_id: assignment.instance_id,
2094                    subsystem_code,
2095                    cl_id,
2096                },
2097                incoming_origins: stress_origins_for(
2098                    &assignment.subsystem_id,
2099                    assignment.instance_id,
2100                    cl_id,
2101                ),
2102            })
2103            .collect();
2104        Ok(DistributedReplaySessionBuild {
2105            session: Box::new(FakeReplaySession),
2106            nodes,
2107            output_log_path: None,
2108        })
2109    }
2110
2111    fn stress_session_ci(
2112        assignment: &DistributedReplayAssignment,
2113        session_config: &DistributedReplaySessionConfig,
2114    ) -> CuResult<DistributedReplaySessionBuild> {
2115        build_stress_session(assignment, session_config, 24)
2116    }
2117
2118    fn stress_session_goto(
2119        assignment: &DistributedReplayAssignment,
2120        session_config: &DistributedReplaySessionConfig,
2121    ) -> CuResult<DistributedReplaySessionBuild> {
2122        build_stress_session(assignment, session_config, 32)
2123    }
2124
2125    fn stress_session_heavy(
2126        assignment: &DistributedReplayAssignment,
2127        session_config: &DistributedReplaySessionConfig,
2128    ) -> CuResult<DistributedReplaySessionBuild> {
2129        build_stress_session(assignment, session_config, 96)
2130    }
2131
2132    fn stress_plan(
2133        instance_count: u32,
2134        session_factory: DistributedReplaySessionFactory,
2135    ) -> DistributedReplayPlan {
2136        let assignments = (1..=instance_count)
2137            .flat_map(|instance_id| {
2138                STRESS_SUBSYSTEMS
2139                    .into_iter()
2140                    .map(move |(subsystem_id, subsystem_code)| {
2141                        fake_assignment(instance_id, subsystem_id, subsystem_code, session_factory)
2142                    })
2143            })
2144            .collect();
2145        fake_plan(assignments)
2146    }
2147
2148    fn collect_engine_order(
2149        engine: &mut DistributedReplayEngine,
2150    ) -> CuResult<Vec<DistributedReplayCursor>> {
2151        let mut order = Vec::new();
2152        while let Some(cursor) = engine.step_causal()? {
2153            order.push(cursor);
2154        }
2155        Ok(order)
2156    }
2157
2158    fn assert_stress_order_is_topological(
2159        order: &[DistributedReplayCursor],
2160        instance_count: u32,
2161        cl_count: u64,
2162    ) {
2163        let expected_len = instance_count as usize * STRESS_SUBSYSTEMS.len() * cl_count as usize;
2164        assert_eq!(order.len(), expected_len);
2165
2166        let positions: BTreeMap<_, _> = order
2167            .iter()
2168            .enumerate()
2169            .map(|(idx, cursor)| {
2170                (
2171                    (
2172                        cursor.instance_id,
2173                        cursor.subsystem_id.clone(),
2174                        cursor.cl_id,
2175                    ),
2176                    idx,
2177                )
2178            })
2179            .collect();
2180        assert_eq!(positions.len(), expected_len);
2181
2182        for instance_id in 1..=instance_count {
2183            for (subsystem_id, _) in STRESS_SUBSYSTEMS {
2184                for cl_id in 1..cl_count {
2185                    let previous = positions
2186                        .get(&(instance_id, subsystem_id.to_string(), cl_id - 1))
2187                        .expect("previous local node missing");
2188                    let current = positions
2189                        .get(&(instance_id, subsystem_id.to_string(), cl_id))
2190                        .expect("current local node missing");
2191                    assert!(
2192                        previous < current,
2193                        "local order violated for instance {instance_id} subsystem '{subsystem_id}' cl {cl_id}"
2194                    );
2195                }
2196            }
2197
2198            for cl_id in 0..cl_count {
2199                let sense = positions
2200                    .get(&(instance_id, "sense".to_string(), cl_id))
2201                    .expect("sense node missing");
2202                let plan = positions
2203                    .get(&(instance_id, "plan".to_string(), cl_id))
2204                    .expect("plan node missing");
2205                let control = positions
2206                    .get(&(instance_id, "control".to_string(), cl_id))
2207                    .expect("control node missing");
2208                let telemetry = positions
2209                    .get(&(instance_id, "telemetry".to_string(), cl_id))
2210                    .expect("telemetry node missing");
2211                assert!(sense < plan);
2212                assert!(plan < control);
2213                assert!(sense < telemetry);
2214                assert!(control < telemetry);
2215            }
2216        }
2217    }
2218
2219    #[test]
2220    fn discovers_single_log_identity_from_runtime_lifecycle() -> CuResult<()> {
2221        let temp_dir = TempDir::new()
2222            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2223        let base_path = temp_dir.path().join("logs/ping.copper");
2224        write_runtime_lifecycle_log(&base_path, test_stack("ping", 7, 42), Some("default"))?;
2225
2226        let discovered = DistributedReplayLog::discover(&base_path)?;
2227        assert_eq!(discovered.base_path, base_path);
2228        assert_eq!(discovered.subsystem_id(), Some("ping"));
2229        assert_eq!(discovered.subsystem_code(), 7);
2230        assert_eq!(discovered.instance_id(), 42);
2231        assert_eq!(discovered.mission.as_deref(), Some("default"));
2232        Ok(())
2233    }
2234
2235    #[test]
2236    fn catalog_discovery_normalizes_slab_paths_and_deduplicates_candidates() -> CuResult<()> {
2237        let temp_dir = TempDir::new()
2238            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2239        let base_path = temp_dir.path().join("logs/pong.copper");
2240        let slab_zero_path = temp_dir.path().join("logs/pong_0.copper");
2241        write_runtime_lifecycle_log(&base_path, test_stack("pong", 3, 9), Some("default"))?;
2242
2243        let catalog = DistributedReplayCatalog::discover([base_path.clone(), slab_zero_path])?;
2244        assert!(
2245            catalog.failures.is_empty(),
2246            "unexpected failures: {:?}",
2247            catalog.failures
2248        );
2249        assert_eq!(catalog.logs.len(), 1);
2250        assert_eq!(catalog.logs[0].base_path, base_path);
2251        assert_eq!(catalog.logs[0].subsystem_id(), Some("pong"));
2252        Ok(())
2253    }
2254
2255    #[test]
2256    fn catalog_discovery_walks_directories_using_physical_slab_files() -> CuResult<()> {
2257        let temp_dir = TempDir::new()
2258            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2259        let ping_base = temp_dir.path().join("logs/ping.copper");
2260        let pong_base = temp_dir.path().join("logs/pong.copper");
2261        write_runtime_lifecycle_log(&ping_base, test_stack("ping", 0, 1), Some("alpha"))?;
2262        write_runtime_lifecycle_log(&pong_base, test_stack("pong", 1, 1), Some("alpha"))?;
2263
2264        let catalog = DistributedReplayCatalog::discover_under(temp_dir.path())?;
2265        assert!(
2266            catalog.failures.is_empty(),
2267            "unexpected failures: {:?}",
2268            catalog.failures
2269        );
2270        assert_eq!(catalog.logs.len(), 2);
2271        assert_eq!(catalog.logs[0].subsystem_id(), Some("ping"));
2272        assert_eq!(catalog.logs[1].subsystem_id(), Some("pong"));
2273        assert_eq!(catalog.logs[0].base_path, ping_base);
2274        assert_eq!(catalog.logs[1].base_path, pong_base);
2275        Ok(())
2276    }
2277
2278    #[test]
2279    fn catalog_reports_invalid_logs_without_aborting_scan() -> CuResult<()> {
2280        let temp_dir = TempDir::new()
2281            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2282        let good_base = temp_dir.path().join("logs/good.copper");
2283        write_runtime_lifecycle_log(&good_base, test_stack("good", 2, 5), Some("beta"))?;
2284
2285        let bad_slab = temp_dir.path().join("logs/bad_0.copper");
2286        if let Some(parent) = bad_slab.parent() {
2287            fs::create_dir_all(parent).map_err(|err| {
2288                CuError::new_with_cause(
2289                    &format!("Failed to create bad log dir '{}'", parent.display()),
2290                    err,
2291                )
2292            })?;
2293        }
2294        fs::write(&bad_slab, b"not a copper log").map_err(|err| {
2295            CuError::new_with_cause(
2296                &format!("Failed to create bad log '{}'", bad_slab.display()),
2297                err,
2298            )
2299        })?;
2300
2301        let catalog = DistributedReplayCatalog::discover_under(temp_dir.path())?;
2302        assert_eq!(catalog.logs.len(), 1);
2303        assert_eq!(catalog.failures.len(), 1);
2304        assert_eq!(catalog.logs[0].subsystem_id(), Some("good"));
2305        assert_eq!(
2306            catalog.failures[0].candidate_path,
2307            temp_dir.path().join("logs/bad.copper")
2308        );
2309        Ok(())
2310    }
2311
2312    #[test]
2313    fn builder_builds_validated_plan_for_selected_instances() -> CuResult<()> {
2314        let temp_dir = TempDir::new()
2315            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2316        let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2317        let logs_root = temp_dir.path().join("logs");
2318
2319        write_runtime_lifecycle_log(
2320            &logs_root.join("instance1_ping.copper"),
2321            test_stack("ping", 0, 1),
2322            Some("default"),
2323        )?;
2324        write_runtime_lifecycle_log(
2325            &logs_root.join("instance1_pong.copper"),
2326            test_stack("pong", 1, 1),
2327            Some("default"),
2328        )?;
2329        write_runtime_lifecycle_log(
2330            &logs_root.join("instance2_ping.copper"),
2331            test_stack("ping", 0, 2),
2332            Some("default"),
2333        )?;
2334        write_runtime_lifecycle_log(
2335            &logs_root.join("instance2_pong.copper"),
2336            test_stack("pong", 1, 2),
2337            Some("default"),
2338        )?;
2339
2340        let plan = DistributedReplayPlan::builder(&multi_config_path)?
2341            .discover_logs_under(&logs_root)?
2342            .register::<PingRegisteredApp>("ping")?
2343            .register::<PongRegisteredApp>("pong")?
2344            .instances([2])
2345            .build()?;
2346
2347        assert_eq!(plan.selected_instances, vec![2]);
2348        assert_eq!(plan.mission.as_deref(), Some("default"));
2349        assert_eq!(plan.assignments.len(), 2);
2350        assert_eq!(
2351            plan.assignment(2, "ping").unwrap().log.base_path,
2352            logs_root.join("instance2_ping.copper")
2353        );
2354        assert_eq!(
2355            plan.assignment(2, "pong").unwrap().log.base_path,
2356            logs_root.join("instance2_pong.copper")
2357        );
2358        Ok(())
2359    }
2360
2361    #[test]
2362    fn register_rejects_subsystem_code_mismatch() -> CuResult<()> {
2363        let temp_dir = TempDir::new()
2364            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2365        let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2366
2367        let err = DistributedReplayPlan::builder(&multi_config_path)?
2368            .register::<PingWrongCodeApp>("ping")
2369            .unwrap_err();
2370        assert!(err.to_string().contains("declares subsystem code 99"));
2371        Ok(())
2372    }
2373
2374    #[test]
2375    fn build_reports_missing_logs_and_missing_registrations() -> CuResult<()> {
2376        let temp_dir = TempDir::new()
2377            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2378        let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2379        let logs_root = temp_dir.path().join("logs");
2380
2381        write_runtime_lifecycle_log(
2382            &logs_root.join("instance1_ping.copper"),
2383            test_stack("ping", 0, 1),
2384            Some("default"),
2385        )?;
2386
2387        let err = DistributedReplayPlan::builder(&multi_config_path)?
2388            .discover_logs_under(&logs_root)?
2389            .register::<PingRegisteredApp>("ping")?
2390            .build()
2391            .unwrap_err();
2392        let err_text = err.to_string();
2393        assert!(err_text.contains("missing app registration for subsystem 'pong'"));
2394        assert!(err_text.contains("missing log for instance 1 subsystem 'pong'"));
2395        Ok(())
2396    }
2397
2398    #[test]
2399    fn build_reports_duplicate_logs_for_one_target() -> CuResult<()> {
2400        let temp_dir = TempDir::new()
2401            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2402        let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2403        let logs_root = temp_dir.path().join("logs");
2404
2405        write_runtime_lifecycle_log(
2406            &logs_root.join("instance1_ping_a.copper"),
2407            test_stack("ping", 0, 1),
2408            Some("default"),
2409        )?;
2410        write_runtime_lifecycle_log(
2411            &logs_root.join("instance1_ping_b.copper"),
2412            test_stack("ping", 0, 1),
2413            Some("default"),
2414        )?;
2415        write_runtime_lifecycle_log(
2416            &logs_root.join("instance1_pong.copper"),
2417            test_stack("pong", 1, 1),
2418            Some("default"),
2419        )?;
2420
2421        let err = DistributedReplayPlan::builder(&multi_config_path)?
2422            .discover_logs_under(&logs_root)?
2423            .register::<PingRegisteredApp>("ping")?
2424            .register::<PongRegisteredApp>("pong")?
2425            .build()
2426            .unwrap_err();
2427        assert!(
2428            err.to_string()
2429                .contains("found 2 logs for instance 1 subsystem 'ping'")
2430        );
2431        Ok(())
2432    }
2433
2434    #[test]
2435    fn build_reports_mission_mismatch_across_selected_logs() -> CuResult<()> {
2436        let temp_dir = TempDir::new()
2437            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2438        let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2439        let logs_root = temp_dir.path().join("logs");
2440
2441        write_runtime_lifecycle_log(
2442            &logs_root.join("instance1_ping.copper"),
2443            test_stack("ping", 0, 1),
2444            Some("default"),
2445        )?;
2446        write_runtime_lifecycle_log(
2447            &logs_root.join("instance1_pong.copper"),
2448            test_stack("pong", 1, 1),
2449            Some("recovery"),
2450        )?;
2451
2452        let err = DistributedReplayPlan::builder(&multi_config_path)?
2453            .discover_logs_under(&logs_root)?
2454            .register::<PingRegisteredApp>("ping")?
2455            .register::<PongRegisteredApp>("pong")?
2456            .build()
2457            .unwrap_err();
2458        assert!(
2459            err.to_string()
2460                .contains("selected logs disagree on mission: default, recovery")
2461        );
2462        Ok(())
2463    }
2464
2465    #[test]
2466    fn engine_steps_in_stable_causal_order() -> CuResult<()> {
2467        let plan = fake_plan(vec![
2468            fake_assignment(1, "ping", 0, fake_ping_session),
2469            fake_assignment(1, "pong", 1, fake_pong_session),
2470        ]);
2471
2472        let mut engine = plan.start()?;
2473        let mut order = Vec::new();
2474        while let Some(cursor) = engine.step_causal()? {
2475            order.push((cursor.subsystem_id, cursor.cl_id));
2476        }
2477
2478        assert_eq!(
2479            order,
2480            vec![
2481                ("ping".to_string(), 0),
2482                ("ping".to_string(), 1),
2483                ("pong".to_string(), 0),
2484                ("pong".to_string(), 1),
2485            ]
2486        );
2487        assert_eq!(engine.executed_nodes(), 4);
2488        Ok(())
2489    }
2490
2491    #[test]
2492    fn engine_goto_rebuilds_and_replays_to_target() -> CuResult<()> {
2493        let plan = fake_plan(vec![
2494            fake_assignment(1, "ping", 0, fake_ping_session),
2495            fake_assignment(1, "pong", 1, fake_pong_session),
2496        ]);
2497
2498        let mut engine = plan.start()?;
2499        engine.run_all()?;
2500        engine.goto(1, "pong", 0)?;
2501
2502        assert_eq!(engine.executed_nodes(), 3);
2503        let frontier = engine.current_frontier();
2504        assert_eq!(frontier.len(), 2);
2505        assert!(frontier.iter().any(|cursor| {
2506            cursor.instance_id == 1 && cursor.subsystem_id == "ping" && cursor.cl_id == 1
2507        }));
2508        assert!(frontier.iter().any(|cursor| {
2509            cursor.instance_id == 1 && cursor.subsystem_id == "pong" && cursor.cl_id == 0
2510        }));
2511        Ok(())
2512    }
2513
2514    #[test]
2515    fn engine_reports_unresolved_recorded_provenance() -> CuResult<()> {
2516        let plan = fake_plan(vec![
2517            fake_assignment(1, "ping", 0, fake_ping_session),
2518            fake_assignment(1, "pong", 1, fake_bad_pong_session),
2519        ]);
2520
2521        let err = match plan.start() {
2522            Ok(_) => return Err(CuError::from("expected distributed replay startup failure")),
2523            Err(err) => err,
2524        };
2525        assert!(
2526            err.to_string()
2527                .contains("Unresolved recorded provenance edge")
2528        );
2529        Ok(())
2530    }
2531
2532    #[test]
2533    fn engine_run_all_scales_across_many_identical_instances() -> CuResult<()> {
2534        let mut engine = stress_plan(6, stress_session_ci).start()?;
2535        let order = collect_engine_order(&mut engine)?;
2536
2537        assert_stress_order_is_topological(&order, 6, 24);
2538        assert_eq!(engine.executed_nodes(), 6 * STRESS_SUBSYSTEMS.len() * 24);
2539
2540        let frontier = engine.current_frontier();
2541        assert_eq!(frontier.len(), 6 * STRESS_SUBSYSTEMS.len());
2542        for instance_id in 1..=6 {
2543            for (subsystem_id, _) in STRESS_SUBSYSTEMS {
2544                assert!(frontier.iter().any(|cursor| {
2545                    cursor.instance_id == instance_id
2546                        && cursor.subsystem_id == subsystem_id
2547                        && cursor.cl_id == 23
2548                }));
2549            }
2550        }
2551        Ok(())
2552    }
2553
2554    #[test]
2555    fn engine_goto_matches_manual_replay_on_large_graph() -> CuResult<()> {
2556        let plan = stress_plan(5, stress_session_goto);
2557        let mut manual = plan.clone().start()?;
2558
2559        let (expected_steps, expected_frontier) = {
2560            let mut expected_steps = 0usize;
2561            loop {
2562                let Some(cursor) = manual.step_causal()? else {
2563                    return Err(CuError::from(
2564                        "manual distributed replay exhausted before reaching stress target",
2565                    ));
2566                };
2567                expected_steps += 1;
2568                if cursor.instance_id == 4 && cursor.subsystem_id == "control" && cursor.cl_id == 17
2569                {
2570                    break (expected_steps, manual.current_frontier());
2571                }
2572            }
2573        };
2574
2575        let mut via_goto = plan.start()?;
2576        via_goto.goto(4, "control", 17)?;
2577
2578        assert_eq!(via_goto.executed_nodes(), expected_steps);
2579        assert_eq!(via_goto.current_frontier(), expected_frontier);
2580        Ok(())
2581    }
2582
2583    #[test]
2584    #[ignore = "stress"]
2585    fn engine_heavy_stress_run_all_completes() -> CuResult<()> {
2586        let mut engine = stress_plan(12, stress_session_heavy).start()?;
2587        engine.run_all()?;
2588
2589        let expected = 12 * STRESS_SUBSYSTEMS.len() * 96;
2590        assert_eq!(engine.executed_nodes(), expected);
2591        assert_eq!(
2592            engine.current_frontier().len(),
2593            12 * STRESS_SUBSYSTEMS.len()
2594        );
2595        Ok(())
2596    }
2597}