Skip to main content

cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::app::Subsystem;
6use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
7use crate::config::{CuConfig, CuGraph, MAX_RATE_TARGET_HZ, NodeId, RuntimeConfig};
8use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
9use crate::cutask::{BincodeAdapter, Freezable};
10#[cfg(feature = "std")]
11use crate::monitoring::ExecutionProbeHandle;
12#[cfg(feature = "std")]
13use crate::monitoring::MonitorExecutionProbe;
14use crate::monitoring::{
15    ComponentId, CopperListInfo, CuMonitor, CuMonitoringMetadata, CuMonitoringRuntime,
16    ExecutionMarker, MonitorComponentMetadata, RuntimeExecutionProbe, build_monitor_topology,
17    take_last_completed_handle_bytes,
18};
19#[cfg(all(feature = "std", feature = "parallel-rt"))]
20use crate::parallel_rt::{ParallelRt, ParallelRtMetadata};
21use crate::resource::ResourceManager;
22use compact_str::CompactString;
23use cu29_clock::{ClockProvider, CuDuration, CuTime, RobotClock};
24use cu29_traits::CuResult;
25use cu29_traits::WriteStream;
26use cu29_traits::{CopperListTuple, CuError};
27
28#[cfg(target_os = "none")]
29#[allow(unused_imports)]
30use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
31#[cfg(target_os = "none")]
32#[allow(unused_imports)]
33use cu29_log_derive::info;
34#[cfg(target_os = "none")]
35#[allow(unused_imports)]
36use cu29_log_runtime::log;
37#[cfg(all(target_os = "none", debug_assertions))]
38#[allow(unused_imports)]
39use cu29_log_runtime::log_debug_mode;
40#[cfg(target_os = "none")]
41#[allow(unused_imports)]
42use cu29_value::to_value;
43
44#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
45use alloc::alloc::{alloc_zeroed, handle_alloc_error};
46use alloc::boxed::Box;
47use alloc::collections::{BTreeSet, VecDeque};
48use alloc::format;
49use alloc::string::{String, ToString};
50use alloc::vec::Vec;
51use bincode::enc::EncoderImpl;
52use bincode::enc::write::{SizeWriter, SliceWriter};
53use bincode::error::EncodeError;
54use bincode::{Decode, Encode};
55#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
56use core::alloc::Layout;
57use core::fmt::Result as FmtResult;
58use core::fmt::{Debug, Formatter};
59use core::marker::PhantomData;
60
61#[cfg(all(feature = "std", feature = "async-cl-io"))]
62use std::sync::mpsc::{Receiver, SyncSender, TryRecvError, sync_channel};
63#[cfg(all(feature = "std", feature = "async-cl-io"))]
64use std::thread::JoinHandle;
65
66pub type TasksInstantiator<CT> =
67    for<'c> fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>;
68pub type BridgesInstantiator<CB> = fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>;
69pub type MonitorInstantiator<M> = fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M;
70
71pub struct CuRuntimeParts<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI> {
72    pub tasks_instanciator: TI,
73    pub monitored_components: &'static [MonitorComponentMetadata],
74    pub culist_component_mapping: &'static [ComponentId],
75    #[cfg(all(feature = "std", feature = "parallel-rt"))]
76    pub parallel_rt_metadata: &'static ParallelRtMetadata,
77    pub monitor_instanciator: MI,
78    pub bridges_instanciator: BI,
79    _payload: PhantomData<(CT, CB, P, M, [(); NBCL])>,
80}
81
82impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI>
83    CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>
84{
85    pub const fn new(
86        tasks_instanciator: TI,
87        monitored_components: &'static [MonitorComponentMetadata],
88        culist_component_mapping: &'static [ComponentId],
89        #[cfg(all(feature = "std", feature = "parallel-rt"))]
90        parallel_rt_metadata: &'static ParallelRtMetadata,
91        monitor_instanciator: MI,
92        bridges_instanciator: BI,
93    ) -> Self {
94        Self {
95            tasks_instanciator,
96            monitored_components,
97            culist_component_mapping,
98            #[cfg(all(feature = "std", feature = "parallel-rt"))]
99            parallel_rt_metadata,
100            monitor_instanciator,
101            bridges_instanciator,
102            _payload: PhantomData,
103        }
104    }
105}
106
107pub struct CuRuntimeBuilder<
108    'cfg,
109    CT,
110    CB,
111    P: CopperListTuple,
112    M: CuMonitor,
113    const NBCL: usize,
114    TI,
115    BI,
116    MI,
117    CLW,
118    KFW,
119> {
120    clock: RobotClock,
121    config: &'cfg CuConfig,
122    mission: &'cfg str,
123    subsystem: Subsystem,
124    instance_id: u32,
125    resources: Option<ResourceManager>,
126    parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
127    copperlists_logger: CLW,
128    keyframes_logger: KFW,
129}
130
131impl<'cfg, CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize, TI, BI, MI, CLW, KFW>
132    CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
133{
134    pub fn new(
135        clock: RobotClock,
136        config: &'cfg CuConfig,
137        mission: &'cfg str,
138        parts: CuRuntimeParts<CT, CB, P, M, NBCL, TI, BI, MI>,
139        copperlists_logger: CLW,
140        keyframes_logger: KFW,
141    ) -> Self {
142        Self {
143            clock,
144            config,
145            mission,
146            subsystem: Subsystem::new(None, 0),
147            instance_id: 0,
148            resources: None,
149            parts,
150            copperlists_logger,
151            keyframes_logger,
152        }
153    }
154
155    pub fn with_subsystem(mut self, subsystem: Subsystem) -> Self {
156        self.subsystem = subsystem;
157        self
158    }
159
160    pub fn with_instance_id(mut self, instance_id: u32) -> Self {
161        self.instance_id = instance_id;
162        self
163    }
164
165    pub fn with_resources(mut self, resources: ResourceManager) -> Self {
166        self.resources = Some(resources);
167        self
168    }
169
170    pub fn try_with_resources_instantiator(
171        mut self,
172        resources_instantiator: impl FnOnce(&CuConfig) -> CuResult<ResourceManager>,
173    ) -> CuResult<Self> {
174        self.resources = Some(resources_instantiator(self.config)?);
175        Ok(self)
176    }
177}
178
179/// Returns a monotonic instant used for local runtime performance timing.
180///
181/// When `sysclock-perf` (and `std`) are enabled this uses a process-local
182/// `RobotClock::new()` instance for timing. The returned value is a
183/// monotonically increasing duration since an unspecified origin (typically
184/// process or runtime initialization), not a wall-clock time-of-day. When
185/// `sysclock-perf` is disabled it delegates to the provided `RobotClock`.
186///
187/// This is intentionally separate from `LoopRateLimiter`, which always uses the
188/// provided `RobotClock` so `runtime.rate_target_hz` stays tied to robot time.
189#[inline]
190pub fn perf_now(_clock: &RobotClock) -> CuTime {
191    #[cfg(all(feature = "std", feature = "sysclock-perf"))]
192    {
193        static PERF_CLOCK: std::sync::OnceLock<RobotClock> = std::sync::OnceLock::new();
194        return PERF_CLOCK.get_or_init(RobotClock::new).now();
195    }
196
197    #[allow(unreachable_code)]
198    _clock.now()
199}
200
201#[cfg(all(feature = "std", feature = "high-precision-limiter"))]
202const HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS: u64 = 200_000;
203
204/// Convert a configured runtime rate target to an integer-nanosecond period.
205#[inline]
206pub fn rate_target_period(rate_target_hz: u64) -> CuResult<CuDuration> {
207    if rate_target_hz == 0 {
208        return Err(CuError::from(
209            "Runtime rate target cannot be zero. Set runtime.rate_target_hz to at least 1.",
210        ));
211    }
212
213    if rate_target_hz > MAX_RATE_TARGET_HZ {
214        return Err(CuError::from(format!(
215            "Runtime rate target ({rate_target_hz} Hz) exceeds the supported maximum of {MAX_RATE_TARGET_HZ} Hz."
216        )));
217    }
218
219    Ok(CuDuration::from(MAX_RATE_TARGET_HZ / rate_target_hz))
220}
221
222/// Runtime loop limiter that preserves phase with absolute deadlines.
223///
224/// This is intentionally a small runtime helper so generated applications do
225/// not have to open-code loop scheduling policy. Deadlines are tracked against
226/// the provided `RobotClock`, even when `sysclock-perf` is enabled for
227/// process-time measurements.
228#[derive(Clone, Copy, Debug, PartialEq, Eq)]
229pub struct LoopRateLimiter {
230    period: CuDuration,
231    next_deadline: CuTime,
232}
233
234impl LoopRateLimiter {
235    #[inline]
236    pub fn from_rate_target_hz(rate_target_hz: u64, clock: &RobotClock) -> CuResult<Self> {
237        let period = rate_target_period(rate_target_hz)?;
238        Ok(Self {
239            period,
240            next_deadline: clock.now() + period,
241        })
242    }
243
244    #[inline]
245    pub fn is_ready(&self, clock: &RobotClock) -> bool {
246        self.remaining(clock).is_none()
247    }
248
249    #[inline]
250    pub fn remaining(&self, clock: &RobotClock) -> Option<CuDuration> {
251        let now = clock.now();
252        if now < self.next_deadline {
253            Some(self.next_deadline - now)
254        } else {
255            None
256        }
257    }
258
259    #[inline]
260    pub fn wait_until_ready(&self, clock: &RobotClock) {
261        let deadline = self.next_deadline;
262        let Some(remaining) = self.remaining(clock) else {
263            return;
264        };
265
266        #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
267        {
268            let spin_window = self.spin_window();
269            if remaining > spin_window {
270                std::thread::sleep(std::time::Duration::from(remaining - spin_window));
271            }
272            while clock.now() < deadline {
273                core::hint::spin_loop();
274            }
275        }
276
277        #[cfg(all(feature = "std", not(feature = "high-precision-limiter")))]
278        {
279            let _ = deadline;
280            std::thread::sleep(std::time::Duration::from(remaining));
281        }
282
283        #[cfg(not(feature = "std"))]
284        {
285            let _ = remaining;
286            while clock.now() < deadline {
287                core::hint::spin_loop();
288            }
289        }
290    }
291
292    #[inline]
293    pub fn mark_tick(&mut self, clock: &RobotClock) {
294        self.advance_from(clock.now());
295    }
296
297    #[inline]
298    pub fn limit(&mut self, clock: &RobotClock) {
299        self.wait_until_ready(clock);
300        self.mark_tick(clock);
301    }
302
303    #[inline]
304    fn advance_from(&mut self, now: CuTime) {
305        let steps = if now < self.next_deadline {
306            1
307        } else {
308            (now - self.next_deadline).as_nanos() / self.period.as_nanos() + 1
309        };
310        self.next_deadline += steps * self.period;
311    }
312
313    #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
314    #[inline]
315    fn spin_window(&self) -> CuDuration {
316        let _ = self.period;
317        CuDuration::from(HIGH_PRECISION_LIMITER_SPIN_WINDOW_NS)
318    }
319
320    #[cfg(test)]
321    #[inline]
322    fn next_deadline(&self) -> CuTime {
323        self.next_deadline
324    }
325}
326
327#[cfg(all(feature = "std", feature = "async-cl-io"))]
328#[doc(hidden)]
329pub trait AsyncCopperListPayload: Send {}
330
331#[cfg(all(feature = "std", feature = "async-cl-io"))]
332impl<T: Send> AsyncCopperListPayload for T {}
333
334#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
335#[doc(hidden)]
336pub trait AsyncCopperListPayload {}
337
338#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
339impl<T> AsyncCopperListPayload for T {}
340
341/// Control-flow result returned by one generated process stage.
342///
343/// `AbortCopperList` preserves the current runtime semantics for monitor
344/// decisions that abort the current CopperList without shutting the runtime
345/// down. The outer driver remains responsible for ordered cleanup and log
346/// handoff.
347#[derive(Clone, Copy, Debug, PartialEq, Eq)]
348pub enum ProcessStepOutcome {
349    Continue,
350    AbortCopperList,
351}
352
353/// Result type used by generated process-step functions.
354pub type ProcessStepResult = CuResult<ProcessStepOutcome>;
355
356#[cfg(feature = "remote-debug")]
357fn encode_completed_copperlist_snapshot<P: CopperListTuple>(
358    cl: &CopperList<P>,
359) -> CuResult<Vec<u8>> {
360    bincode::encode_to_vec(cl, bincode::config::standard())
361        .map_err(|e| CuError::new_with_cause("Failed to encode completed CopperList snapshot", e))
362}
363
364/// Manages the lifecycle of the copper lists and logging on the synchronous path.
365pub struct SyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
366    inner: CuListsManager<P, NBCL>,
367    /// Logger for the copper lists (messages between tasks)
368    logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
369    /// Remote-debug snapshot of the most recently completed CopperList.
370    #[cfg(feature = "remote-debug")]
371    last_completed_encoded: Option<Vec<u8>>,
372    /// Last encoded size returned by logger.log
373    pub last_encoded_bytes: u64,
374    /// Last handle-backed payload bytes observed during logger.log
375    pub last_handle_bytes: u64,
376}
377
378impl<P: CopperListTuple + Default, const NBCL: usize> SyncCopperListsManager<P, NBCL> {
379    pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
380    where
381        P: CuListZeroedInit,
382    {
383        Ok(Self {
384            inner: CuListsManager::new(),
385            logger,
386            #[cfg(feature = "remote-debug")]
387            last_completed_encoded: None,
388            last_encoded_bytes: 0,
389            last_handle_bytes: 0,
390        })
391    }
392
393    pub fn next_cl_id(&self) -> u64 {
394        self.inner.next_cl_id()
395    }
396
397    pub fn last_cl_id(&self) -> u64 {
398        self.inner.last_cl_id()
399    }
400
401    pub fn peek(&self) -> Option<&CopperList<P>> {
402        self.inner.peek()
403    }
404
405    #[cfg(feature = "remote-debug")]
406    pub fn last_completed_encoded(&self) -> Option<&[u8]> {
407        self.last_completed_encoded.as_deref()
408    }
409
410    #[cfg(not(feature = "remote-debug"))]
411    pub fn last_completed_encoded(&self) -> Option<&[u8]> {
412        None
413    }
414
415    #[cfg(feature = "remote-debug")]
416    pub fn set_last_completed_encoded(&mut self, snapshot: Option<Vec<u8>>) {
417        self.last_completed_encoded = snapshot;
418    }
419
420    #[cfg(not(feature = "remote-debug"))]
421    pub fn set_last_completed_encoded(&mut self, _snapshot: Option<Vec<u8>>) {}
422
423    pub fn create(&mut self) -> CuResult<&mut CopperList<P>> {
424        self.inner
425            .create()
426            .ok_or_else(|| CuError::from("Ran out of space for copper lists"))
427    }
428
429    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
430        let mut is_top = true;
431        let mut nb_done = 0;
432        self.last_handle_bytes = 0;
433        #[cfg(feature = "remote-debug")]
434        let last_completed_encoded = &mut self.last_completed_encoded;
435        for cl in self.inner.iter_mut() {
436            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
437                cl.change_state(CopperListState::DoneProcessing);
438                match () {
439                    #[cfg(feature = "remote-debug")]
440                    () => {
441                        *last_completed_encoded = Some(encode_completed_copperlist_snapshot(cl)?);
442                    }
443                    #[cfg(not(feature = "remote-debug"))]
444                    () => {}
445                }
446            }
447            if is_top && cl.get_state() == CopperListState::DoneProcessing {
448                if let Some(logger) = &mut self.logger {
449                    cl.change_state(CopperListState::BeingSerialized);
450                    logger.log(cl)?;
451                    self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
452                    self.last_handle_bytes = take_last_completed_handle_bytes();
453                }
454                cl.change_state(CopperListState::Free);
455                nb_done += 1;
456            } else {
457                is_top = false;
458            }
459        }
460        for _ in 0..nb_done {
461            let _ = self.inner.pop();
462        }
463        Ok(())
464    }
465
466    pub fn finish_pending(&mut self) -> CuResult<()> {
467        Ok(())
468    }
469
470    pub fn available_copper_lists(&mut self) -> CuResult<usize> {
471        Ok(NBCL - self.inner.len())
472    }
473
474    #[cfg(feature = "std")]
475    pub fn end_of_processing_boxed(
476        &mut self,
477        mut culist: Box<CopperList<P>>,
478    ) -> CuResult<OwnedCopperListSubmission<P>> {
479        culist.change_state(CopperListState::DoneProcessing);
480        self.last_encoded_bytes = 0;
481        self.last_handle_bytes = 0;
482        if let Some(logger) = &mut self.logger {
483            culist.change_state(CopperListState::BeingSerialized);
484            logger.log(&culist)?;
485            self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
486            self.last_handle_bytes = take_last_completed_handle_bytes();
487        }
488        culist.change_state(CopperListState::Free);
489        Ok(OwnedCopperListSubmission::Recycled(culist))
490    }
491
492    #[cfg(feature = "std")]
493    pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
494        Ok(None)
495    }
496
497    #[cfg(feature = "std")]
498    pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
499        Err(CuError::from(
500            "Synchronous CopperList I/O cannot block waiting for boxed completions",
501        ))
502    }
503
504    #[cfg(feature = "std")]
505    pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
506        Ok(Vec::new())
507    }
508}
509
510/// Result of handing an owned boxed CopperList to the runtime-side CL I/O path.
511#[cfg(feature = "std")]
512pub enum OwnedCopperListSubmission<P: CopperListTuple> {
513    /// The CL has been fully handled and can be recycled immediately by the caller.
514    Recycled(Box<CopperList<P>>),
515    /// The CL was queued asynchronously and will be returned by a later reclaim call.
516    Pending,
517}
518
519#[cfg(all(feature = "std", feature = "async-cl-io"))]
520struct AsyncCopperListCompletion<P: CopperListTuple> {
521    culist: Box<CopperList<P>>,
522    log_result: CuResult<(u64, u64)>,
523}
524
525#[cfg(all(feature = "std", any(feature = "async-cl-io", feature = "parallel-rt")))]
526fn allocate_zeroed_copperlist<P>() -> Box<CopperList<P>>
527where
528    P: CopperListTuple + CuListZeroedInit,
529{
530    // SAFETY: We allocate zeroed memory and immediately initialize required fields.
531    let mut culist = unsafe {
532        let layout = Layout::new::<CopperList<P>>();
533        let ptr = alloc_zeroed(layout) as *mut CopperList<P>;
534        if ptr.is_null() {
535            handle_alloc_error(layout);
536        }
537        Box::from_raw(ptr)
538    };
539    culist.msgs.init_zeroed();
540    culist
541}
542
543#[cfg(all(feature = "std", feature = "parallel-rt"))]
544pub fn allocate_boxed_copperlists<P, const NBCL: usize>() -> Vec<Box<CopperList<P>>>
545where
546    P: CopperListTuple + CuListZeroedInit,
547{
548    let mut free_pool = Vec::with_capacity(NBCL);
549    for _ in 0..NBCL {
550        free_pool.push(allocate_zeroed_copperlist::<P>());
551    }
552    free_pool
553}
554
555/// Manages the lifecycle of the copper lists and logging on the asynchronous path.
556#[cfg(all(feature = "std", feature = "async-cl-io"))]
557pub struct AsyncCopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
558    free_pool: Vec<Box<CopperList<P>>>,
559    current: Option<Box<CopperList<P>>>,
560    #[cfg(feature = "remote-debug")]
561    last_completed_encoded: Option<Vec<u8>>,
562    pending_count: usize,
563    next_cl_id: u64,
564    pending_sender: Option<SyncSender<Box<CopperList<P>>>>,
565    completion_receiver: Option<Receiver<AsyncCopperListCompletion<P>>>,
566    worker_handle: Option<JoinHandle<()>>,
567    /// Last encoded size returned by logger.log
568    pub last_encoded_bytes: u64,
569    /// Last handle-backed payload bytes observed during logger.log
570    pub last_handle_bytes: u64,
571}
572
573#[cfg(all(feature = "std", feature = "async-cl-io"))]
574impl<P: CopperListTuple + Default, const NBCL: usize> AsyncCopperListsManager<P, NBCL> {
575    pub fn new(logger: Option<Box<dyn WriteStream<CopperList<P>>>>) -> CuResult<Self>
576    where
577        P: CuListZeroedInit + AsyncCopperListPayload + 'static,
578    {
579        let mut free_pool = Vec::with_capacity(NBCL);
580        for _ in 0..NBCL {
581            free_pool.push(allocate_zeroed_copperlist::<P>());
582        }
583
584        let (pending_sender, completion_receiver, worker_handle) = if let Some(mut logger) = logger
585        {
586            let (pending_sender, pending_receiver) = sync_channel::<Box<CopperList<P>>>(NBCL);
587            let (completion_sender, completion_receiver) =
588                sync_channel::<AsyncCopperListCompletion<P>>(NBCL);
589            let worker_handle = std::thread::Builder::new()
590                .name("cu-async-cl-io".to_string())
591                .spawn(move || {
592                    while let Ok(mut culist) = pending_receiver.recv() {
593                        culist.change_state(CopperListState::BeingSerialized);
594                        let log_result = logger.log(&culist).map(|_| {
595                            (
596                                logger.last_log_bytes().unwrap_or(0) as u64,
597                                take_last_completed_handle_bytes(),
598                            )
599                        });
600                        let should_stop = log_result.is_err();
601                        if completion_sender
602                            .send(AsyncCopperListCompletion { culist, log_result })
603                            .is_err()
604                        {
605                            break;
606                        }
607                        if should_stop {
608                            break;
609                        }
610                    }
611                })
612                .map_err(|e| {
613                    CuError::from("Failed to spawn async CopperList serializer thread")
614                        .add_cause(e.to_string().as_str())
615                })?;
616            (
617                Some(pending_sender),
618                Some(completion_receiver),
619                Some(worker_handle),
620            )
621        } else {
622            (None, None, None)
623        };
624
625        Ok(Self {
626            free_pool,
627            current: None,
628            #[cfg(feature = "remote-debug")]
629            last_completed_encoded: None,
630            pending_count: 0,
631            next_cl_id: 0,
632            pending_sender,
633            completion_receiver,
634            worker_handle,
635            last_encoded_bytes: 0,
636            last_handle_bytes: 0,
637        })
638    }
639
640    pub fn next_cl_id(&self) -> u64 {
641        self.next_cl_id
642    }
643
644    pub fn last_cl_id(&self) -> u64 {
645        self.next_cl_id.saturating_sub(1)
646    }
647
648    pub fn peek(&self) -> Option<&CopperList<P>> {
649        self.current.as_deref()
650    }
651
652    #[cfg(feature = "remote-debug")]
653    pub fn last_completed_encoded(&self) -> Option<&[u8]> {
654        self.last_completed_encoded.as_deref()
655    }
656
657    #[cfg(not(feature = "remote-debug"))]
658    pub fn last_completed_encoded(&self) -> Option<&[u8]> {
659        None
660    }
661
662    #[cfg(feature = "remote-debug")]
663    pub fn set_last_completed_encoded(&mut self, snapshot: Option<Vec<u8>>) {
664        self.last_completed_encoded = snapshot;
665    }
666
667    #[cfg(not(feature = "remote-debug"))]
668    pub fn set_last_completed_encoded(&mut self, _snapshot: Option<Vec<u8>>) {}
669
670    pub fn create(&mut self) -> CuResult<&mut CopperList<P>> {
671        if self.current.is_some() {
672            return Err(CuError::from(
673                "Attempted to create a CopperList while another one is still active",
674            ));
675        }
676
677        self.reclaim_completed()?;
678        while self.free_pool.is_empty() {
679            self.wait_for_completion()?;
680        }
681
682        let culist = self
683            .free_pool
684            .pop()
685            .ok_or_else(|| CuError::from("Ran out of space for copper lists"))?;
686        self.current = Some(culist);
687
688        let current = self
689            .current
690            .as_mut()
691            .expect("current CopperList is missing");
692        current.id = self.next_cl_id;
693        current.change_state(CopperListState::Initialized);
694        self.next_cl_id += 1;
695        Ok(current.as_mut())
696    }
697
698    #[cfg(feature = "remote-debug")]
699    fn capture_completed_snapshot(&mut self, cl: &CopperList<P>) -> CuResult<()> {
700        self.last_completed_encoded = Some(encode_completed_copperlist_snapshot(cl)?);
701        Ok(())
702    }
703
704    #[cfg(not(feature = "remote-debug"))]
705    fn capture_completed_snapshot(&mut self, _cl: &CopperList<P>) -> CuResult<()> {
706        Ok(())
707    }
708
709    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
710        self.reclaim_completed()?;
711
712        let mut culist = self.current.take().ok_or_else(|| {
713            CuError::from("Attempted to finish processing without an active CopperList")
714        })?;
715
716        if culist.id != culistid {
717            return Err(CuError::from(format!(
718                "Attempted to finish CopperList #{culistid} while CopperList #{} is active",
719                culist.id
720            )));
721        }
722
723        culist.change_state(CopperListState::DoneProcessing);
724        self.capture_completed_snapshot(&culist)?;
725        self.last_encoded_bytes = 0;
726        self.last_handle_bytes = 0;
727
728        if let Some(pending_sender) = &self.pending_sender {
729            culist.change_state(CopperListState::QueuedForSerialization);
730            pending_sender.send(culist).map_err(|e| {
731                CuError::from("Failed to enqueue CopperList for async serialization")
732                    .add_cause(e.to_string().as_str())
733            })?;
734            self.pending_count += 1;
735            self.reclaim_completed()?;
736        } else {
737            culist.change_state(CopperListState::Free);
738            self.free_pool.push(culist);
739        }
740
741        Ok(())
742    }
743
744    pub fn finish_pending(&mut self) -> CuResult<()> {
745        if self.current.is_some() {
746            return Err(CuError::from(
747                "Cannot flush CopperList I/O while a CopperList is still active",
748            ));
749        }
750
751        while self.pending_count > 0 {
752            self.wait_for_completion()?;
753        }
754        Ok(())
755    }
756
757    pub fn available_copper_lists(&mut self) -> CuResult<usize> {
758        self.reclaim_completed()?;
759        Ok(self.free_pool.len())
760    }
761
762    pub fn end_of_processing_boxed(
763        &mut self,
764        mut culist: Box<CopperList<P>>,
765    ) -> CuResult<OwnedCopperListSubmission<P>> {
766        self.reclaim_completed()?;
767        culist.change_state(CopperListState::DoneProcessing);
768        self.capture_completed_snapshot(&culist)?;
769        self.last_encoded_bytes = 0;
770        self.last_handle_bytes = 0;
771
772        if let Some(pending_sender) = &self.pending_sender {
773            culist.change_state(CopperListState::QueuedForSerialization);
774            pending_sender.send(culist).map_err(|e| {
775                CuError::from("Failed to enqueue CopperList for async serialization")
776                    .add_cause(e.to_string().as_str())
777            })?;
778            self.pending_count += 1;
779            self.reclaim_completed()?;
780            Ok(OwnedCopperListSubmission::Pending)
781        } else {
782            culist.change_state(CopperListState::Free);
783            Ok(OwnedCopperListSubmission::Recycled(culist))
784        }
785    }
786
787    pub fn try_reclaim_boxed(&mut self) -> CuResult<Option<Box<CopperList<P>>>> {
788        let recv_result = {
789            let Some(completion_receiver) = self.completion_receiver.as_ref() else {
790                return Ok(None);
791            };
792            completion_receiver.try_recv()
793        };
794        match recv_result {
795            Ok(completion) => self.handle_completion(completion).map(Some),
796            Err(TryRecvError::Empty) => Ok(None),
797            Err(TryRecvError::Disconnected) => Err(CuError::from(
798                "Async CopperList serializer thread disconnected unexpectedly",
799            )),
800        }
801    }
802
803    pub fn wait_reclaim_boxed(&mut self) -> CuResult<Box<CopperList<P>>> {
804        let completion = self
805            .completion_receiver
806            .as_ref()
807            .ok_or_else(|| {
808                CuError::from("No async CopperList serializer is active to return a free slot")
809            })?
810            .recv()
811            .map_err(|e| {
812                CuError::from("Failed to receive completion from async CopperList serializer")
813                    .add_cause(e.to_string().as_str())
814            })?;
815        self.handle_completion(completion)
816    }
817
818    pub fn finish_pending_boxed(&mut self) -> CuResult<Vec<Box<CopperList<P>>>> {
819        let mut reclaimed = Vec::with_capacity(self.pending_count);
820        if self.current.is_some() {
821            return Err(CuError::from(
822                "Cannot flush CopperList I/O while a CopperList is still active",
823            ));
824        }
825        while self.pending_count > 0 {
826            reclaimed.push(self.wait_reclaim_boxed()?);
827        }
828        Ok(reclaimed)
829    }
830
831    fn reclaim_completed(&mut self) -> CuResult<()> {
832        loop {
833            let Some(culist) = self.try_reclaim_boxed()? else {
834                break;
835            };
836            self.free_pool.push(culist);
837        }
838        Ok(())
839    }
840
841    fn wait_for_completion(&mut self) -> CuResult<()> {
842        let culist = self.wait_reclaim_boxed()?;
843        self.free_pool.push(culist);
844        Ok(())
845    }
846
847    fn handle_completion(
848        &mut self,
849        mut completion: AsyncCopperListCompletion<P>,
850    ) -> CuResult<Box<CopperList<P>>> {
851        self.pending_count = self.pending_count.saturating_sub(1);
852        if let Ok((encoded_bytes, handle_bytes)) = completion.log_result.as_ref() {
853            self.last_encoded_bytes = *encoded_bytes;
854            self.last_handle_bytes = *handle_bytes;
855        }
856        completion.culist.change_state(CopperListState::Free);
857        completion.log_result?;
858        Ok(completion.culist)
859    }
860
861    fn shutdown_worker(&mut self) -> CuResult<()> {
862        self.finish_pending()?;
863        self.pending_sender.take();
864        if let Some(worker_handle) = self.worker_handle.take() {
865            worker_handle.join().map_err(|_| {
866                CuError::from("Async CopperList serializer thread panicked while joining")
867            })?;
868        }
869        Ok(())
870    }
871}
872
873#[cfg(all(feature = "std", feature = "async-cl-io"))]
874impl<P: CopperListTuple + Default, const NBCL: usize> Drop for AsyncCopperListsManager<P, NBCL> {
875    fn drop(&mut self) {
876        let _ = self.shutdown_worker();
877    }
878}
879
880#[cfg(all(feature = "std", feature = "async-cl-io"))]
881pub type CopperListsManager<P, const NBCL: usize> = AsyncCopperListsManager<P, NBCL>;
882
883#[cfg(not(all(feature = "std", feature = "async-cl-io")))]
884pub type CopperListsManager<P, const NBCL: usize> = SyncCopperListsManager<P, NBCL>;
885
886/// Manages the frozen tasks state and logging.
887pub struct KeyFramesManager {
888    /// Where the serialized tasks are stored following the wave of execution of a CL.
889    inner: KeyFrame,
890
891    /// Optional override for the timestamp to stamp the next keyframe (used by deterministic replay).
892    forced_timestamp: Option<CuTime>,
893
894    /// If set, reuse this keyframe verbatim (e.g., during replay) instead of re-freezing state.
895    locked: bool,
896
897    /// Logger for the state of the tasks (frozen tasks)
898    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
899
900    /// Capture a keyframe only each...
901    keyframe_interval: u32,
902
903    /// Bytes written by the last keyframe log
904    pub last_encoded_bytes: u64,
905}
906
907impl KeyFramesManager {
908    fn is_keyframe(&self, culistid: u64) -> bool {
909        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval as u64)
910    }
911
912    #[inline]
913    pub fn captures_keyframe(&self, culistid: u64) -> bool {
914        self.is_keyframe(culistid)
915    }
916
917    pub fn reset(&mut self, culistid: u64, clock: &RobotClock) {
918        if self.is_keyframe(culistid) {
919            // If a recorded keyframe was preloaded for this CL, keep it as-is.
920            if self.locked && self.inner.culistid == culistid {
921                return;
922            }
923            let ts = self.forced_timestamp.take().unwrap_or_else(|| clock.now());
924            self.inner.reset(culistid, ts);
925            self.locked = false;
926        }
927    }
928
929    /// Force the timestamp of the next keyframe to a given value.
930    #[cfg(feature = "std")]
931    pub fn set_forced_timestamp(&mut self, ts: CuTime) {
932        self.forced_timestamp = Some(ts);
933    }
934
935    pub fn freeze_task(&mut self, culistid: u64, task: &impl Freezable) -> CuResult<usize> {
936        if self.is_keyframe(culistid) {
937            if self.locked {
938                // We are replaying a recorded keyframe verbatim; don't mutate it.
939                return Ok(0);
940            }
941            if self.inner.culistid != culistid {
942                return Err(CuError::from(format!(
943                    "Freezing task for culistid {} but current keyframe is {}",
944                    culistid, self.inner.culistid
945                )));
946            }
947            self.inner
948                .add_frozen_task(task)
949                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
950        } else {
951            Ok(0)
952        }
953    }
954
955    /// Generic helper to freeze any `Freezable` state (task or bridge) into the current keyframe.
956    pub fn freeze_any(&mut self, culistid: u64, item: &impl Freezable) -> CuResult<usize> {
957        self.freeze_task(culistid, item)
958    }
959
960    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
961        if self.is_keyframe(culistid) {
962            let logger = self.logger.as_mut().unwrap();
963            logger.log(&self.inner)?;
964            self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
965            // Clear the lock so the next CL can rebuild normally unless re-locked.
966            self.locked = false;
967            Ok(())
968        } else {
969            // Not a keyframe for this CL; ensure we don't carry stale sizes forward.
970            self.last_encoded_bytes = 0;
971            Ok(())
972        }
973    }
974
975    /// Preload a recorded keyframe so it is logged verbatim on the matching CL.
976    #[cfg(feature = "std")]
977    pub fn lock_keyframe(&mut self, keyframe: &KeyFrame) {
978        self.inner = keyframe.clone();
979        self.forced_timestamp = Some(keyframe.timestamp);
980        self.locked = true;
981    }
982}
983
984/// This is the main structure that will be injected as a member of the Application struct.
985/// CT is the tuple of all the tasks in order of execution.
986/// CL is the type of the copper list, representing the input/output messages for all the tasks.
987pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
988    /// The base clock the runtime will be using to record time.
989    pub clock: RobotClock, // TODO: remove public at some point
990
991    /// Compile-time subsystem identity for this Copper process.
992    subsystem_code: u16,
993
994    /// Deployment/runtime instance identity for this Copper process.
995    pub instance_id: u32,
996
997    /// The tuple of all the tasks in order of execution.
998    pub tasks: CT,
999
1000    /// Tuple of all instantiated bridges.
1001    pub bridges: CB,
1002
1003    /// Resource registry kept alive for tasks borrowing shared handles.
1004    pub resources: ResourceManager,
1005
1006    /// The runtime monitoring.
1007    pub monitor: M,
1008
1009    /// Runtime-side execution progress probe for watchdog/diagnostic monitors.
1010    ///
1011    /// This probe is written from the generated execution plan before each component
1012    /// step. Monitors consume it asynchronously (typically from watchdog threads) to
1013    /// report the last known component/step/culist when the runtime appears stalled.
1014    #[cfg(feature = "std")]
1015    pub execution_probe: ExecutionProbeHandle,
1016    #[cfg(not(feature = "std"))]
1017    pub execution_probe: RuntimeExecutionProbe,
1018
1019    /// The logger for the copper lists (messages between tasks)
1020    pub copperlists_manager: CopperListsManager<P, NBCL>,
1021
1022    /// The logger for the state of the tasks (frozen tasks)
1023    pub keyframes_manager: KeyFramesManager,
1024
1025    /// Feature-gated container for deterministic multi-CopperList execution.
1026    #[cfg(all(feature = "std", feature = "parallel-rt"))]
1027    pub parallel_rt: ParallelRt<NBCL>,
1028
1029    /// The runtime configuration controlling the behavior of the run loop
1030    pub runtime_config: RuntimeConfig,
1031}
1032
1033/// To be able to share the clock we make the runtime a clock provider.
1034impl<
1035    CT,
1036    CB,
1037    P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload,
1038    M: CuMonitor,
1039    const NBCL: usize,
1040> ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
1041{
1042    fn get_clock(&self) -> RobotClock {
1043        self.clock.clone()
1044    }
1045}
1046
1047impl<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> CuRuntime<CT, CB, P, M, NBCL> {
1048    /// Returns a clone of the runtime clock handle.
1049    #[inline]
1050    pub fn clock(&self) -> RobotClock {
1051        self.clock.clone()
1052    }
1053
1054    /// Returns the compile-time subsystem code for this process.
1055    #[inline]
1056    pub fn subsystem_code(&self) -> u16 {
1057        self.subsystem_code
1058    }
1059
1060    /// Returns the configured runtime instance id for this process.
1061    #[inline]
1062    pub fn instance_id(&self) -> u32 {
1063        self.instance_id
1064    }
1065}
1066
1067impl<
1068    'cfg,
1069    CT,
1070    CB,
1071    P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1072    M: CuMonitor,
1073    const NBCL: usize,
1074    TI,
1075    BI,
1076    MI,
1077    CLW,
1078    KFW,
1079> CuRuntimeBuilder<'cfg, CT, CB, P, M, NBCL, TI, BI, MI, CLW, KFW>
1080where
1081    TI: for<'c> Fn(Vec<Option<&'c ComponentConfig>>, &mut ResourceManager) -> CuResult<CT>,
1082    BI: Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1083    MI: Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1084    CLW: WriteStream<CopperList<P>> + 'static,
1085    KFW: WriteStream<KeyFrame> + 'static,
1086{
1087    pub fn build(self) -> CuResult<CuRuntime<CT, CB, P, M, NBCL>> {
1088        let Self {
1089            clock,
1090            config,
1091            mission,
1092            subsystem,
1093            instance_id,
1094            resources,
1095            parts,
1096            copperlists_logger,
1097            keyframes_logger,
1098        } = self;
1099        let mut resources =
1100            resources.ok_or_else(|| CuError::from("Resources missing from CuRuntimeBuilder"))?;
1101
1102        let graph = config.get_graph(Some(mission))?;
1103        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
1104            .get_all_nodes()
1105            .iter()
1106            .map(|(_, node)| node.get_instance_config())
1107            .collect();
1108
1109        let tasks = (parts.tasks_instanciator)(all_instances_configs, &mut resources)?;
1110
1111        #[cfg(feature = "std")]
1112        let execution_probe = std::sync::Arc::new(RuntimeExecutionProbe::default());
1113        #[cfg(not(feature = "std"))]
1114        let execution_probe = RuntimeExecutionProbe::default();
1115        let monitor_metadata = CuMonitoringMetadata::new(
1116            CompactString::from(mission),
1117            parts.monitored_components,
1118            parts.culist_component_mapping,
1119            CopperListInfo::new(core::mem::size_of::<CopperList<P>>(), NBCL),
1120            build_monitor_topology(config, mission)?,
1121            None,
1122        )?
1123        .with_subsystem_id(subsystem.id())
1124        .with_instance_id(instance_id);
1125        #[cfg(feature = "std")]
1126        let monitor_runtime =
1127            CuMonitoringRuntime::new(MonitorExecutionProbe::from_shared(execution_probe.clone()));
1128        #[cfg(not(feature = "std"))]
1129        let monitor_runtime = CuMonitoringRuntime::unavailable();
1130        let monitor = (parts.monitor_instanciator)(config, monitor_metadata, monitor_runtime);
1131        let bridges = (parts.bridges_instanciator)(config, &mut resources)?;
1132
1133        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
1134            Some(logging_config) if logging_config.enable_task_logging => (
1135                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1136                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1137                logging_config.keyframe_interval.unwrap(),
1138            ),
1139            Some(_) => (None, None, 0),
1140            None => (
1141                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
1142                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
1143                DEFAULT_KEYFRAME_INTERVAL,
1144            ),
1145        };
1146
1147        let copperlists_manager = CopperListsManager::new(copperlists_logger)?;
1148        #[cfg(target_os = "none")]
1149        {
1150            let cl_size = core::mem::size_of::<CopperList<P>>();
1151            let total_bytes = cl_size.saturating_mul(NBCL);
1152            info!(
1153                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
1154                NBCL, cl_size, total_bytes
1155            );
1156        }
1157
1158        let keyframes_manager = KeyFramesManager {
1159            inner: KeyFrame::new(),
1160            logger: keyframes_logger,
1161            keyframe_interval,
1162            last_encoded_bytes: 0,
1163            forced_timestamp: None,
1164            locked: false,
1165        };
1166        #[cfg(all(feature = "std", feature = "parallel-rt"))]
1167        let parallel_rt = ParallelRt::new(parts.parallel_rt_metadata)?;
1168
1169        let runtime_config = config.runtime.clone().unwrap_or_default();
1170        runtime_config.validate()?;
1171
1172        Ok(CuRuntime {
1173            subsystem_code: subsystem.code(),
1174            instance_id,
1175            tasks,
1176            bridges,
1177            resources,
1178            monitor,
1179            execution_probe,
1180            clock,
1181            copperlists_manager,
1182            keyframes_manager,
1183            #[cfg(all(feature = "std", feature = "parallel-rt"))]
1184            parallel_rt,
1185            runtime_config,
1186        })
1187    }
1188}
1189
1190/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
1191/// It is a double encapsulation: this one recording the culistid and another even in
1192/// bincode in the serialized_tasks.
1193#[derive(Clone, Encode, Decode)]
1194pub struct KeyFrame {
1195    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
1196    pub culistid: u64,
1197    // This is the timestamp when the keyframe was created, using the robot clock.
1198    pub timestamp: CuTime,
1199    // This is the bincode representation of the tuple of all the tasks.
1200    pub serialized_tasks: Vec<u8>,
1201}
1202
1203impl KeyFrame {
1204    fn new() -> Self {
1205        KeyFrame {
1206            culistid: 0,
1207            timestamp: CuTime::default(),
1208            serialized_tasks: Vec::new(),
1209        }
1210    }
1211
1212    /// This is to be able to avoid reallocations
1213    fn reset(&mut self, culistid: u64, timestamp: CuTime) {
1214        self.culistid = culistid;
1215        self.timestamp = timestamp;
1216        self.serialized_tasks.clear();
1217    }
1218
1219    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
1220    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
1221        let cfg = bincode::config::standard();
1222        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
1223        BincodeAdapter(task).encode(&mut sizer)?;
1224        let need = sizer.into_writer().bytes_written as usize;
1225
1226        let start = self.serialized_tasks.len();
1227        self.serialized_tasks.resize(start + need, 0);
1228        let mut enc =
1229            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
1230        BincodeAdapter(task).encode(&mut enc)?;
1231        Ok(need)
1232    }
1233}
1234
1235/// Identifies where the effective runtime configuration came from.
1236#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1237pub enum RuntimeLifecycleConfigSource {
1238    ProgrammaticOverride,
1239    ExternalFile,
1240    BundledDefault,
1241}
1242
1243/// Stack and process identification metadata persisted in the runtime lifecycle log.
1244#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1245pub struct RuntimeLifecycleStackInfo {
1246    pub app_name: String,
1247    pub app_version: String,
1248    pub git_commit: Option<String>,
1249    pub git_dirty: Option<bool>,
1250    pub subsystem_id: Option<String>,
1251    pub subsystem_code: u16,
1252    pub instance_id: u32,
1253}
1254
1255/// Runtime lifecycle events emitted in the dedicated lifecycle section.
1256#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1257pub enum RuntimeLifecycleEvent {
1258    Instantiated {
1259        config_source: RuntimeLifecycleConfigSource,
1260        effective_config_ron: String,
1261        stack: RuntimeLifecycleStackInfo,
1262    },
1263    MissionStarted {
1264        mission: String,
1265    },
1266    MissionStopped {
1267        mission: String,
1268        // TODO(lifecycle): replace free-form reason with a typed stop reason enum once
1269        // std/no-std behavior and panic integration are split in a follow-up PR.
1270        reason: String,
1271    },
1272    // TODO(lifecycle): wire panic hook / no_std equivalent to emit this event consistently.
1273    Panic {
1274        message: String,
1275        file: Option<String>,
1276        line: Option<u32>,
1277        column: Option<u32>,
1278    },
1279    ShutdownCompleted,
1280}
1281
1282/// One event record persisted in the `UnifiedLogType::RuntimeLifecycle` section.
1283#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
1284pub struct RuntimeLifecycleRecord {
1285    pub timestamp: CuTime,
1286    pub event: RuntimeLifecycleEvent,
1287}
1288
1289impl<
1290    CT,
1291    CB,
1292    P: CopperListTuple + CuListZeroedInit + Default + AsyncCopperListPayload + 'static,
1293    M: CuMonitor,
1294    const NBCL: usize,
1295> CuRuntime<CT, CB, P, M, NBCL>
1296{
1297    /// Records runtime execution progress in the shared probe.
1298    ///
1299    /// This is intentionally lightweight and does not call monitor callbacks.
1300    #[inline]
1301    pub fn record_execution_marker(&self, marker: ExecutionMarker) {
1302        self.execution_probe.record(marker);
1303    }
1304
1305    /// Returns a shared reference to the concrete runtime execution probe.
1306    ///
1307    /// The generated runtime uses this when it needs a uniform
1308    /// `&RuntimeExecutionProbe` view across `std` and `no_std` builds.
1309    #[inline]
1310    pub fn execution_probe_ref(&self) -> &RuntimeExecutionProbe {
1311        #[cfg(feature = "std")]
1312        {
1313            self.execution_probe.as_ref()
1314        }
1315
1316        #[cfg(not(feature = "std"))]
1317        {
1318            &self.execution_probe
1319        }
1320    }
1321
1322    // FIXME(gbin): this became REALLY ugly with no-std
1323    #[allow(clippy::too_many_arguments)]
1324    #[cfg(feature = "std")]
1325    #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new(...).")]
1326    pub fn new(
1327        clock: RobotClock,
1328        subsystem_code: u16,
1329        config: &CuConfig,
1330        mission: &str,
1331        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
1332        tasks_instanciator: impl for<'c> Fn(
1333            Vec<Option<&'c ComponentConfig>>,
1334            &mut ResourceManager,
1335        ) -> CuResult<CT>,
1336        monitored_components: &'static [MonitorComponentMetadata],
1337        culist_component_mapping: &'static [ComponentId],
1338        #[cfg(all(feature = "std", feature = "parallel-rt"))]
1339        parallel_rt_metadata: &'static ParallelRtMetadata,
1340        monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1341        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1342        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1343        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1344    ) -> CuResult<Self> {
1345        let parts = CuRuntimeParts::new(
1346            tasks_instanciator,
1347            monitored_components,
1348            culist_component_mapping,
1349            #[cfg(all(feature = "std", feature = "parallel-rt"))]
1350            parallel_rt_metadata,
1351            monitor_instanciator,
1352            bridges_instanciator,
1353        );
1354        CuRuntimeBuilder::new(
1355            clock,
1356            config,
1357            mission,
1358            parts,
1359            copperlists_logger,
1360            keyframes_logger,
1361        )
1362        .with_subsystem(Subsystem::new(None, subsystem_code))
1363        .try_with_resources_instantiator(resources_instanciator)?
1364        .build()
1365    }
1366
1367    #[allow(clippy::too_many_arguments)]
1368    #[cfg(feature = "std")]
1369    #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new_with_resources(...).")]
1370    pub fn new_with_resources(
1371        clock: RobotClock,
1372        subsystem_code: u16,
1373        config: &CuConfig,
1374        mission: &str,
1375        resources: ResourceManager,
1376        tasks_instanciator: impl for<'c> Fn(
1377            Vec<Option<&'c ComponentConfig>>,
1378            &mut ResourceManager,
1379        ) -> CuResult<CT>,
1380        monitored_components: &'static [MonitorComponentMetadata],
1381        culist_component_mapping: &'static [ComponentId],
1382        #[cfg(all(feature = "std", feature = "parallel-rt"))]
1383        parallel_rt_metadata: &'static ParallelRtMetadata,
1384        monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1385        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1386        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1387        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1388    ) -> CuResult<Self> {
1389        let parts = CuRuntimeParts::new(
1390            tasks_instanciator,
1391            monitored_components,
1392            culist_component_mapping,
1393            #[cfg(all(feature = "std", feature = "parallel-rt"))]
1394            parallel_rt_metadata,
1395            monitor_instanciator,
1396            bridges_instanciator,
1397        );
1398        CuRuntimeBuilder::new(
1399            clock,
1400            config,
1401            mission,
1402            parts,
1403            copperlists_logger,
1404            keyframes_logger,
1405        )
1406        .with_subsystem(Subsystem::new(None, subsystem_code))
1407        .with_resources(resources)
1408        .build()
1409    }
1410
1411    #[allow(clippy::too_many_arguments)]
1412    #[cfg(not(feature = "std"))]
1413    #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new(...).")]
1414    pub fn new(
1415        clock: RobotClock,
1416        subsystem_code: u16,
1417        config: &CuConfig,
1418        mission: &str,
1419        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
1420        tasks_instanciator: impl for<'c> Fn(
1421            Vec<Option<&'c ComponentConfig>>,
1422            &mut ResourceManager,
1423        ) -> CuResult<CT>,
1424        monitored_components: &'static [MonitorComponentMetadata],
1425        culist_component_mapping: &'static [ComponentId],
1426        monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1427        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1428        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1429        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1430    ) -> CuResult<Self> {
1431        let parts = CuRuntimeParts::new(
1432            tasks_instanciator,
1433            monitored_components,
1434            culist_component_mapping,
1435            monitor_instanciator,
1436            bridges_instanciator,
1437        );
1438        CuRuntimeBuilder::new(
1439            clock,
1440            config,
1441            mission,
1442            parts,
1443            copperlists_logger,
1444            keyframes_logger,
1445        )
1446        .with_subsystem(Subsystem::new(None, subsystem_code))
1447        .try_with_resources_instantiator(resources_instanciator)?
1448        .build()
1449    }
1450
1451    #[allow(clippy::too_many_arguments)]
1452    #[cfg(not(feature = "std"))]
1453    #[deprecated(note = "Use CuRuntimeBuilder instead of CuRuntime::new_with_resources(...).")]
1454    pub fn new_with_resources(
1455        clock: RobotClock,
1456        subsystem_code: u16,
1457        config: &CuConfig,
1458        mission: &str,
1459        resources: ResourceManager,
1460        tasks_instanciator: impl for<'c> Fn(
1461            Vec<Option<&'c ComponentConfig>>,
1462            &mut ResourceManager,
1463        ) -> CuResult<CT>,
1464        monitored_components: &'static [MonitorComponentMetadata],
1465        culist_component_mapping: &'static [ComponentId],
1466        monitor_instanciator: impl Fn(&CuConfig, CuMonitoringMetadata, CuMonitoringRuntime) -> M,
1467        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
1468        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
1469        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
1470    ) -> CuResult<Self> {
1471        let parts = CuRuntimeParts::new(
1472            tasks_instanciator,
1473            monitored_components,
1474            culist_component_mapping,
1475            monitor_instanciator,
1476            bridges_instanciator,
1477        );
1478        CuRuntimeBuilder::new(
1479            clock,
1480            config,
1481            mission,
1482            parts,
1483            copperlists_logger,
1484            keyframes_logger,
1485        )
1486        .with_subsystem(Subsystem::new(None, subsystem_code))
1487        .with_resources(resources)
1488        .build()
1489    }
1490}
1491
1492/// Copper tasks can be of 3 types:
1493/// - Source: only producing output messages (usually used for drivers)
1494/// - Regular: processing input messages and producing output messages, more like compute nodes.
1495/// - Sink: only consuming input messages (usually used for actuators)
1496#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1497pub enum CuTaskType {
1498    Source,
1499    Regular,
1500    Sink,
1501}
1502
1503#[derive(Debug, Clone)]
1504pub struct CuOutputPack {
1505    pub culist_index: u32,
1506    pub msg_types: Vec<String>,
1507}
1508
1509#[derive(Debug, Clone)]
1510pub struct CuInputMsg {
1511    pub culist_index: u32,
1512    pub msg_type: String,
1513    pub src_port: usize,
1514    pub edge_id: usize,
1515}
1516
1517/// This structure represents a step in the execution plan.
1518pub struct CuExecutionStep {
1519    /// NodeId: node id of the task to execute
1520    pub node_id: NodeId,
1521    /// Node: node instance
1522    pub node: Node,
1523    /// CuTaskType: type of the task
1524    pub task_type: CuTaskType,
1525
1526    /// the indices in the copper list of the input messages and their types
1527    pub input_msg_indices_types: Vec<CuInputMsg>,
1528
1529    /// the index in the copper list of the output message and its type
1530    pub output_msg_pack: Option<CuOutputPack>,
1531}
1532
1533impl Debug for CuExecutionStep {
1534    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1535        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
1536        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
1537        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
1538        f.write_str(
1539            format!(
1540                "              input_msg_types: {:?}\n",
1541                self.input_msg_indices_types
1542            )
1543            .as_str(),
1544        )?;
1545        f.write_str(format!("       output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
1546        Ok(())
1547    }
1548}
1549
1550/// This structure represents a loop in the execution plan.
1551/// It is used to represent a sequence of Execution units (loop or steps) that are executed
1552/// multiple times.
1553/// if loop_count is None, the loop is infinite.
1554pub struct CuExecutionLoop {
1555    pub steps: Vec<CuExecutionUnit>,
1556    pub loop_count: Option<u32>,
1557}
1558
1559impl Debug for CuExecutionLoop {
1560    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1561        f.write_str("CuExecutionLoop:\n")?;
1562        for step in &self.steps {
1563            match step {
1564                CuExecutionUnit::Step(step) => {
1565                    step.fmt(f)?;
1566                }
1567                CuExecutionUnit::Loop(l) => {
1568                    l.fmt(f)?;
1569                }
1570            }
1571        }
1572
1573        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
1574        Ok(())
1575    }
1576}
1577
1578/// This structure represents a step in the execution plan.
1579#[derive(Debug)]
1580pub enum CuExecutionUnit {
1581    Step(Box<CuExecutionStep>),
1582    Loop(CuExecutionLoop),
1583}
1584
1585fn find_output_pack_from_nodeid(
1586    node_id: NodeId,
1587    steps: &Vec<CuExecutionUnit>,
1588) -> Option<CuOutputPack> {
1589    for step in steps {
1590        match step {
1591            CuExecutionUnit::Loop(loop_unit) => {
1592                if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
1593                    return Some(output_pack);
1594                }
1595            }
1596            CuExecutionUnit::Step(step) => {
1597                if step.node_id == node_id {
1598                    return step.output_msg_pack.clone();
1599                }
1600            }
1601        }
1602    }
1603    None
1604}
1605
1606pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
1607    if graph.incoming_neighbor_count(node_id) == 0 {
1608        CuTaskType::Source
1609    } else if graph.outgoing_neighbor_count(node_id) == 0 {
1610        CuTaskType::Sink
1611    } else {
1612        CuTaskType::Regular
1613    }
1614}
1615
1616/// The connection id used here is the index of the config graph edge that equates to the wanted
1617/// connection.
1618fn sort_inputs_by_cnx_id(input_msg_indices_types: &mut [CuInputMsg]) {
1619    input_msg_indices_types.sort_by_key(|input| input.edge_id);
1620}
1621
1622fn collect_output_msg_types(graph: &CuGraph, node_id: NodeId) -> Vec<String> {
1623    let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
1624    edge_ids.sort();
1625
1626    let mut msg_order: Vec<(usize, String)> = Vec::new();
1627    let mut record_msg = |msg: String, order: usize| {
1628        if let Some((existing_order, _)) = msg_order
1629            .iter_mut()
1630            .find(|(_, existing_msg)| *existing_msg == msg)
1631        {
1632            if order < *existing_order {
1633                *existing_order = order;
1634            }
1635            return;
1636        }
1637        msg_order.push((order, msg));
1638    };
1639
1640    for edge_id in edge_ids {
1641        if let Some(edge) = graph.edge(edge_id) {
1642            let order = if edge.order == usize::MAX {
1643                edge_id
1644            } else {
1645                edge.order
1646            };
1647            record_msg(edge.msg.clone(), order);
1648        }
1649    }
1650    if let Some(node) = graph.get_node(node_id) {
1651        for (msg, order) in node.nc_outputs_with_order() {
1652            record_msg(msg.clone(), order);
1653        }
1654    }
1655
1656    msg_order.sort_by(|(order_a, msg_a), (order_b, msg_b)| {
1657        order_a.cmp(order_b).then_with(|| msg_a.cmp(msg_b))
1658    });
1659    msg_order.into_iter().map(|(_, msg)| msg).collect()
1660}
1661/// Explores a subbranch and build the partial plan out of it.
1662fn plan_tasks_tree_branch(
1663    graph: &CuGraph,
1664    mut next_culist_output_index: u32,
1665    starting_point: NodeId,
1666    plan: &mut Vec<CuExecutionUnit>,
1667) -> (u32, bool) {
1668    #[cfg(all(feature = "std", feature = "macro_debug"))]
1669    eprintln!("-- starting branch from node {starting_point}");
1670
1671    let mut handled = false;
1672
1673    for id in graph.bfs_nodes(starting_point) {
1674        let node_ref = graph.get_node(id).unwrap();
1675        #[cfg(all(feature = "std", feature = "macro_debug"))]
1676        eprintln!("  Visiting node: {node_ref:?}");
1677
1678        let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
1679        let output_msg_pack: Option<CuOutputPack>;
1680        let task_type = find_task_type_for_id(graph, id);
1681
1682        match task_type {
1683            CuTaskType::Source => {
1684                #[cfg(all(feature = "std", feature = "macro_debug"))]
1685                eprintln!("    → Source node, assign output index {next_culist_output_index}");
1686                let msg_types = collect_output_msg_types(graph, id);
1687                if msg_types.is_empty() {
1688                    panic!(
1689                        "Source node '{}' has no outgoing connections",
1690                        node_ref.get_id()
1691                    );
1692                }
1693                output_msg_pack = Some(CuOutputPack {
1694                    culist_index: next_culist_output_index,
1695                    msg_types,
1696                });
1697                next_culist_output_index += 1;
1698            }
1699            CuTaskType::Sink => {
1700                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1701                edge_ids.sort();
1702                #[cfg(all(feature = "std", feature = "macro_debug"))]
1703                eprintln!("    → Sink with incoming edges: {edge_ids:?}");
1704                for edge_id in edge_ids {
1705                    let edge = graph
1706                        .edge(edge_id)
1707                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1708                    let pid = graph
1709                        .get_node_id_by_name(edge.src.as_str())
1710                        .unwrap_or_else(|| {
1711                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1712                        });
1713                    let output_pack = find_output_pack_from_nodeid(pid, plan);
1714                    if let Some(output_pack) = output_pack {
1715                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1716                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
1717                        let msg_type = edge.msg.as_str();
1718                        let src_port = output_pack
1719                            .msg_types
1720                            .iter()
1721                            .position(|msg| msg == msg_type)
1722                            .unwrap_or_else(|| {
1723                                panic!(
1724                                    "Missing output port for message type '{msg_type}' on node {pid}"
1725                                )
1726                            });
1727                        input_msg_indices_types.push(CuInputMsg {
1728                            culist_index: output_pack.culist_index,
1729                            msg_type: msg_type.to_string(),
1730                            src_port,
1731                            edge_id,
1732                        });
1733                    } else {
1734                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1735                        eprintln!("      ✗ Input from {pid} not ready, returning");
1736                        return (next_culist_output_index, handled);
1737                    }
1738                }
1739                output_msg_pack = Some(CuOutputPack {
1740                    culist_index: next_culist_output_index,
1741                    msg_types: Vec::from(["()".to_string()]),
1742                });
1743                next_culist_output_index += 1;
1744            }
1745            CuTaskType::Regular => {
1746                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
1747                edge_ids.sort();
1748                #[cfg(all(feature = "std", feature = "macro_debug"))]
1749                eprintln!("    → Regular task with incoming edges: {edge_ids:?}");
1750                for edge_id in edge_ids {
1751                    let edge = graph
1752                        .edge(edge_id)
1753                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
1754                    let pid = graph
1755                        .get_node_id_by_name(edge.src.as_str())
1756                        .unwrap_or_else(|| {
1757                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
1758                        });
1759                    let output_pack = find_output_pack_from_nodeid(pid, plan);
1760                    if let Some(output_pack) = output_pack {
1761                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1762                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
1763                        let msg_type = edge.msg.as_str();
1764                        let src_port = output_pack
1765                            .msg_types
1766                            .iter()
1767                            .position(|msg| msg == msg_type)
1768                            .unwrap_or_else(|| {
1769                                panic!(
1770                                    "Missing output port for message type '{msg_type}' on node {pid}"
1771                                )
1772                            });
1773                        input_msg_indices_types.push(CuInputMsg {
1774                            culist_index: output_pack.culist_index,
1775                            msg_type: msg_type.to_string(),
1776                            src_port,
1777                            edge_id,
1778                        });
1779                    } else {
1780                        #[cfg(all(feature = "std", feature = "macro_debug"))]
1781                        eprintln!("      ✗ Input from {pid} not ready, returning");
1782                        return (next_culist_output_index, handled);
1783                    }
1784                }
1785                let msg_types = collect_output_msg_types(graph, id);
1786                if msg_types.is_empty() {
1787                    panic!(
1788                        "Regular node '{}' has no outgoing connections",
1789                        node_ref.get_id()
1790                    );
1791                }
1792                output_msg_pack = Some(CuOutputPack {
1793                    culist_index: next_culist_output_index,
1794                    msg_types,
1795                });
1796                next_culist_output_index += 1;
1797            }
1798        }
1799
1800        sort_inputs_by_cnx_id(&mut input_msg_indices_types);
1801
1802        if let Some(pos) = plan
1803            .iter()
1804            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
1805        {
1806            #[cfg(all(feature = "std", feature = "macro_debug"))]
1807            eprintln!("    → Already in plan, modifying existing step");
1808            let mut step = plan.remove(pos);
1809            if let CuExecutionUnit::Step(ref mut s) = step {
1810                s.input_msg_indices_types = input_msg_indices_types;
1811            }
1812            plan.push(step);
1813        } else {
1814            #[cfg(all(feature = "std", feature = "macro_debug"))]
1815            eprintln!("    → New step added to plan");
1816            let step = CuExecutionStep {
1817                node_id: id,
1818                node: node_ref.clone(),
1819                task_type,
1820                input_msg_indices_types,
1821                output_msg_pack,
1822            };
1823            plan.push(CuExecutionUnit::Step(Box::new(step)));
1824        }
1825
1826        handled = true;
1827    }
1828
1829    #[cfg(all(feature = "std", feature = "macro_debug"))]
1830    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
1831    (next_culist_output_index, handled)
1832}
1833
1834/// This is the main heuristics to compute an execution plan at compilation time.
1835/// TODO(gbin): Make that heuristic pluggable.
1836pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
1837    #[cfg(all(feature = "std", feature = "macro_debug"))]
1838    eprintln!("[runtime plan]");
1839    let mut plan = Vec::new();
1840    let mut next_culist_output_index = 0u32;
1841
1842    let mut queue: VecDeque<NodeId> = graph
1843        .node_ids()
1844        .into_iter()
1845        .filter(|&node_id| find_task_type_for_id(graph, node_id) == CuTaskType::Source)
1846        .collect();
1847
1848    #[cfg(all(feature = "std", feature = "macro_debug"))]
1849    eprintln!("Initial source nodes: {queue:?}");
1850
1851    while let Some(start_node) = queue.pop_front() {
1852        #[cfg(all(feature = "std", feature = "macro_debug"))]
1853        eprintln!("→ Starting BFS from source {start_node}");
1854        for node_id in graph.bfs_nodes(start_node) {
1855            let already_in_plan = plan
1856                .iter()
1857                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
1858            if already_in_plan {
1859                #[cfg(all(feature = "std", feature = "macro_debug"))]
1860                eprintln!("    → Node {node_id} already planned, skipping");
1861                continue;
1862            }
1863
1864            #[cfg(all(feature = "std", feature = "macro_debug"))]
1865            eprintln!("    Planning from node {node_id}");
1866            let (new_index, handled) =
1867                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
1868            next_culist_output_index = new_index;
1869
1870            if !handled {
1871                #[cfg(all(feature = "std", feature = "macro_debug"))]
1872                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
1873                continue;
1874            }
1875
1876            #[cfg(all(feature = "std", feature = "macro_debug"))]
1877            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
1878            for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
1879                #[cfg(all(feature = "std", feature = "macro_debug"))]
1880                eprintln!("      → Enqueueing neighbor {neighbor}");
1881                queue.push_back(neighbor);
1882            }
1883        }
1884    }
1885
1886    let mut planned_nodes = BTreeSet::new();
1887    for unit in &plan {
1888        if let CuExecutionUnit::Step(step) = unit {
1889            planned_nodes.insert(step.node_id);
1890        }
1891    }
1892
1893    let mut missing = Vec::new();
1894    for node_id in graph.node_ids() {
1895        if !planned_nodes.contains(&node_id) {
1896            if let Some(node) = graph.get_node(node_id) {
1897                missing.push(node.get_id().to_string());
1898            } else {
1899                missing.push(format!("node_id_{node_id}"));
1900            }
1901        }
1902    }
1903
1904    if !missing.is_empty() {
1905        missing.sort();
1906        return Err(CuError::from(format!(
1907            "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
1908            missing.join(", ")
1909        )));
1910    }
1911
1912    Ok(CuExecutionLoop {
1913        steps: plan,
1914        loop_count: None,
1915    })
1916}
1917
1918//tests
1919#[cfg(test)]
1920mod tests {
1921    use super::*;
1922    use crate::config::Node;
1923    use crate::context::CuContext;
1924    use crate::cutask::CuSinkTask;
1925    use crate::cutask::{CuSrcTask, Freezable};
1926    use crate::monitoring::NoMonitor;
1927    use crate::reflect::Reflect;
1928    use bincode::Encode;
1929    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
1930    use serde_derive::{Deserialize, Serialize};
1931
1932    #[derive(Reflect)]
1933    pub struct TestSource {}
1934
1935    impl Freezable for TestSource {}
1936
1937    impl CuSrcTask for TestSource {
1938        type Resources<'r> = ();
1939        type Output<'m> = ();
1940        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1941        where
1942            Self: Sized,
1943        {
1944            Ok(Self {})
1945        }
1946
1947        fn process(&mut self, _ctx: &CuContext, _empty_msg: &mut Self::Output<'_>) -> CuResult<()> {
1948            Ok(())
1949        }
1950    }
1951
1952    #[derive(Reflect)]
1953    pub struct TestSink {}
1954
1955    impl Freezable for TestSink {}
1956
1957    impl CuSinkTask for TestSink {
1958        type Resources<'r> = ();
1959        type Input<'m> = ();
1960
1961        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1962        where
1963            Self: Sized,
1964        {
1965            Ok(Self {})
1966        }
1967
1968        fn process(&mut self, _ctx: &CuContext, _input: &Self::Input<'_>) -> CuResult<()> {
1969            Ok(())
1970        }
1971    }
1972
1973    // Those should be generated by the derive macro
1974    type Tasks = (TestSource, TestSink);
1975    type TestRuntime = CuRuntime<Tasks, (), Msgs, NoMonitor, 2>;
1976    const TEST_NBCL: usize = 2;
1977
1978    #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1979    struct Msgs(());
1980
1981    impl ErasedCuStampedDataSet for Msgs {
1982        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1983            Vec::new()
1984        }
1985    }
1986
1987    impl MatchingTasks for Msgs {
1988        fn get_all_task_ids() -> &'static [&'static str] {
1989            &[]
1990        }
1991    }
1992
1993    impl CuListZeroedInit for Msgs {
1994        fn init_zeroed(&mut self) {}
1995    }
1996
1997    #[cfg(feature = "std")]
1998    fn tasks_instanciator(
1999        all_instances_configs: Vec<Option<&ComponentConfig>>,
2000        _resources: &mut ResourceManager,
2001    ) -> CuResult<Tasks> {
2002        Ok((
2003            TestSource::new(all_instances_configs[0], ())?,
2004            TestSink::new(all_instances_configs[1], ())?,
2005        ))
2006    }
2007
2008    #[cfg(not(feature = "std"))]
2009    fn tasks_instanciator(
2010        all_instances_configs: Vec<Option<&ComponentConfig>>,
2011        _resources: &mut ResourceManager,
2012    ) -> CuResult<Tasks> {
2013        Ok((
2014            TestSource::new(all_instances_configs[0], ())?,
2015            TestSink::new(all_instances_configs[1], ())?,
2016        ))
2017    }
2018
2019    fn monitor_instanciator(
2020        _config: &CuConfig,
2021        metadata: CuMonitoringMetadata,
2022        runtime: CuMonitoringRuntime,
2023    ) -> NoMonitor {
2024        NoMonitor::new(metadata, runtime).expect("NoMonitor::new should never fail")
2025    }
2026
2027    fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
2028        Ok(())
2029    }
2030
2031    fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
2032        Ok(ResourceManager::new(&[]))
2033    }
2034
2035    #[derive(Debug)]
2036    struct FakeWriter {}
2037
2038    impl<E: Encode> WriteStream<E> for FakeWriter {
2039        fn log(&mut self, _obj: &E) -> CuResult<()> {
2040            Ok(())
2041        }
2042    }
2043
2044    #[test]
2045    fn test_runtime_instantiation() {
2046        let mut config = CuConfig::default();
2047        let graph = config.get_graph_mut(None).unwrap();
2048        graph.add_node(Node::new("a", "TestSource")).unwrap();
2049        graph.add_node(Node::new("b", "TestSink")).unwrap();
2050        graph.connect(0, 1, "()").unwrap();
2051        let runtime: CuResult<TestRuntime> =
2052            CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
2053                RobotClock::default(),
2054                &config,
2055                crate::config::DEFAULT_MISSION_ID,
2056                CuRuntimeParts::new(
2057                    tasks_instanciator,
2058                    &[],
2059                    &[],
2060                    #[cfg(all(feature = "std", feature = "parallel-rt"))]
2061                    &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
2062                    monitor_instanciator,
2063                    bridges_instanciator,
2064                ),
2065                FakeWriter {},
2066                FakeWriter {},
2067            )
2068            .try_with_resources_instantiator(resources_instanciator)
2069            .and_then(|builder| builder.build());
2070        assert!(runtime.is_ok());
2071    }
2072
2073    #[test]
2074    fn test_rate_target_period_rejects_zero() {
2075        let err = rate_target_period(0).expect_err("zero rate target should fail");
2076        assert!(
2077            err.to_string()
2078                .contains("Runtime rate target cannot be zero"),
2079            "unexpected error: {err}"
2080        );
2081    }
2082
2083    #[test]
2084    fn test_loop_rate_limiter_advances_to_next_period_when_on_time() {
2085        let (clock, mock) = RobotClock::mock();
2086        let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
2087        assert_eq!(limiter.next_deadline(), CuDuration::from(10_000_000));
2088
2089        mock.set_value(10_000_000);
2090        limiter.mark_tick(&clock);
2091
2092        assert_eq!(limiter.next_deadline(), CuDuration::from(20_000_000));
2093    }
2094
2095    #[test]
2096    fn test_loop_rate_limiter_skips_missed_periods_without_resetting_phase() {
2097        let (clock, mock) = RobotClock::mock();
2098        let mut limiter = LoopRateLimiter::from_rate_target_hz(100, &clock).unwrap();
2099
2100        mock.set_value(35_000_000);
2101        limiter.mark_tick(&clock);
2102
2103        assert_eq!(limiter.next_deadline(), CuDuration::from(40_000_000));
2104    }
2105
2106    #[cfg(all(feature = "std", feature = "high-precision-limiter"))]
2107    #[test]
2108    fn test_loop_rate_limiter_spin_window_is_fixed_scheduler_window() {
2109        let (clock, _) = RobotClock::mock();
2110        let limiter = LoopRateLimiter::from_rate_target_hz(1_000, &clock).unwrap();
2111        assert_eq!(limiter.spin_window(), CuDuration::from(200_000));
2112
2113        let fast = LoopRateLimiter::from_rate_target_hz(10_000, &clock).unwrap();
2114        assert_eq!(fast.spin_window(), CuDuration::from(200_000));
2115    }
2116
2117    #[cfg(not(feature = "async-cl-io"))]
2118    #[test]
2119    fn test_copperlists_manager_lifecycle() {
2120        let mut config = CuConfig::default();
2121        let graph = config.get_graph_mut(None).unwrap();
2122        graph.add_node(Node::new("a", "TestSource")).unwrap();
2123        graph.add_node(Node::new("b", "TestSink")).unwrap();
2124        graph.connect(0, 1, "()").unwrap();
2125
2126        let mut runtime: TestRuntime =
2127            CuRuntimeBuilder::<Tasks, (), Msgs, NoMonitor, TEST_NBCL, _, _, _, _, _>::new(
2128                RobotClock::default(),
2129                &config,
2130                crate::config::DEFAULT_MISSION_ID,
2131                CuRuntimeParts::new(
2132                    tasks_instanciator,
2133                    &[],
2134                    &[],
2135                    #[cfg(all(feature = "std", feature = "parallel-rt"))]
2136                    &crate::parallel_rt::DISABLED_PARALLEL_RT_METADATA,
2137                    monitor_instanciator,
2138                    bridges_instanciator,
2139                ),
2140                FakeWriter {},
2141                FakeWriter {},
2142            )
2143            .try_with_resources_instantiator(resources_instanciator)
2144            .and_then(|builder| builder.build())
2145            .unwrap();
2146
2147        // Now emulates the generated runtime
2148        {
2149            let copperlists = &mut runtime.copperlists_manager;
2150            let culist0 = copperlists
2151                .create()
2152                .expect("Ran out of space for copper lists");
2153            let id = culist0.id;
2154            assert_eq!(id, 0);
2155            culist0.change_state(CopperListState::Processing);
2156            assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2157        }
2158
2159        {
2160            let copperlists = &mut runtime.copperlists_manager;
2161            let culist1 = copperlists
2162                .create()
2163                .expect("Ran out of space for copper lists");
2164            let id = culist1.id;
2165            assert_eq!(id, 1);
2166            culist1.change_state(CopperListState::Processing);
2167            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2168        }
2169
2170        {
2171            let copperlists = &mut runtime.copperlists_manager;
2172            let culist2 = copperlists.create();
2173            assert!(culist2.is_err());
2174            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2175            // Free in order, should let the top of the stack be serialized and freed.
2176            let _ = copperlists.end_of_processing(1);
2177            assert_eq!(copperlists.available_copper_lists().unwrap(), 1);
2178        }
2179
2180        // Readd a CL
2181        {
2182            let copperlists = &mut runtime.copperlists_manager;
2183            let culist2 = copperlists
2184                .create()
2185                .expect("Ran out of space for copper lists");
2186            let id = culist2.id;
2187            assert_eq!(id, 2);
2188            culist2.change_state(CopperListState::Processing);
2189            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2190            // Free out of order, the #0 first
2191            let _ = copperlists.end_of_processing(0);
2192            // Should not free up the top of the stack
2193            assert_eq!(copperlists.available_copper_lists().unwrap(), 0);
2194
2195            // Free up the top of the stack
2196            let _ = copperlists.end_of_processing(2);
2197            // This should free up 2 CLs
2198
2199            assert_eq!(copperlists.available_copper_lists().unwrap(), 2);
2200        }
2201    }
2202
2203    #[cfg(all(feature = "std", feature = "async-cl-io"))]
2204    #[derive(Debug, Default)]
2205    struct RecordingWriter {
2206        ids: Arc<Mutex<Vec<u64>>>,
2207    }
2208
2209    #[cfg(all(feature = "std", feature = "async-cl-io"))]
2210    impl WriteStream<CopperList<Msgs>> for RecordingWriter {
2211        fn log(&mut self, culist: &CopperList<Msgs>) -> CuResult<()> {
2212            self.ids.lock().unwrap().push(culist.id);
2213            std::thread::sleep(std::time::Duration::from_millis(2));
2214            Ok(())
2215        }
2216    }
2217
2218    #[cfg(all(feature = "std", feature = "async-cl-io"))]
2219    #[test]
2220    fn test_async_copperlists_manager_flushes_in_order() {
2221        let ids = Arc::new(Mutex::new(Vec::new()));
2222        let mut copperlists = CopperListsManager::<Msgs, 4>::new(Some(Box::new(RecordingWriter {
2223            ids: ids.clone(),
2224        })))
2225        .unwrap();
2226
2227        for expected_id in 0..4 {
2228            let culist = copperlists.create().unwrap();
2229            assert_eq!(culist.id, expected_id);
2230            culist.change_state(CopperListState::Processing);
2231            copperlists.end_of_processing(expected_id).unwrap();
2232        }
2233
2234        copperlists.finish_pending().unwrap();
2235        assert_eq!(copperlists.available_copper_lists().unwrap(), 4);
2236        assert_eq!(*ids.lock().unwrap(), vec![0, 1, 2, 3]);
2237    }
2238
2239    #[test]
2240    fn test_runtime_task_input_order() {
2241        let mut config = CuConfig::default();
2242        let graph = config.get_graph_mut(None).unwrap();
2243        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
2244        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
2245        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
2246
2247        assert_eq!(src1_id, 0);
2248        assert_eq!(src2_id, 1);
2249
2250        // note that the source2 connection is before the source1
2251        let src1_type = "src1_type";
2252        let src2_type = "src2_type";
2253        graph.connect(src2_id, sink_id, src2_type).unwrap();
2254        graph.connect(src1_id, sink_id, src1_type).unwrap();
2255
2256        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
2257        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
2258        // the edge id depends on the order the connection is created, not
2259        // on the node id, and that is what determines the input order
2260        assert_eq!(src1_edge_id, 1);
2261        assert_eq!(src2_edge_id, 0);
2262
2263        let runtime = compute_runtime_plan(graph).unwrap();
2264        let sink_step = runtime
2265            .steps
2266            .iter()
2267            .find_map(|step| match step {
2268                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
2269                _ => None,
2270            })
2271            .unwrap();
2272
2273        // since the src2 connection was added before src1 connection, the src2 type should be
2274        // first
2275        assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
2276        assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
2277    }
2278
2279    #[test]
2280    fn test_runtime_output_ports_unique_ordered() {
2281        let mut config = CuConfig::default();
2282        let graph = config.get_graph_mut(None).unwrap();
2283        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2284        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2285        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2286        let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
2287        let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
2288
2289        graph.connect(src_id, dst_a_id, "msg::A").unwrap();
2290        graph.connect(src_id, dst_b_id, "msg::B").unwrap();
2291        graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
2292        graph.connect(src_id, dst_c_id, "msg::C").unwrap();
2293
2294        let runtime = compute_runtime_plan(graph).unwrap();
2295        let src_step = runtime
2296            .steps
2297            .iter()
2298            .find_map(|step| match step {
2299                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2300                _ => None,
2301            })
2302            .unwrap();
2303
2304        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2305        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
2306
2307        let dst_a_step = runtime
2308            .steps
2309            .iter()
2310            .find_map(|step| match step {
2311                CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
2312                _ => None,
2313            })
2314            .unwrap();
2315        let dst_b_step = runtime
2316            .steps
2317            .iter()
2318            .find_map(|step| match step {
2319                CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
2320                _ => None,
2321            })
2322            .unwrap();
2323        let dst_a2_step = runtime
2324            .steps
2325            .iter()
2326            .find_map(|step| match step {
2327                CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
2328                _ => None,
2329            })
2330            .unwrap();
2331        let dst_c_step = runtime
2332            .steps
2333            .iter()
2334            .find_map(|step| match step {
2335                CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
2336                _ => None,
2337            })
2338            .unwrap();
2339
2340        assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
2341        assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
2342        assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
2343        assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
2344    }
2345
2346    #[test]
2347    fn test_runtime_output_ports_fanout_single() {
2348        let mut config = CuConfig::default();
2349        let graph = config.get_graph_mut(None).unwrap();
2350        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2351        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
2352        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
2353
2354        graph.connect(src_id, dst_a_id, "i32").unwrap();
2355        graph.connect(src_id, dst_b_id, "i32").unwrap();
2356
2357        let runtime = compute_runtime_plan(graph).unwrap();
2358        let src_step = runtime
2359            .steps
2360            .iter()
2361            .find_map(|step| match step {
2362                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2363                _ => None,
2364            })
2365            .unwrap();
2366
2367        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2368        assert_eq!(output_pack.msg_types, vec!["i32"]);
2369    }
2370
2371    #[test]
2372    fn test_runtime_output_ports_include_nc_outputs() {
2373        let mut config = CuConfig::default();
2374        let graph = config.get_graph_mut(None).unwrap();
2375        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
2376        let dst_id = graph.add_node(Node::new("dst", "Sink")).unwrap();
2377        graph.connect(src_id, dst_id, "msg::A").unwrap();
2378        graph
2379            .get_node_mut(src_id)
2380            .expect("missing source node")
2381            .add_nc_output("msg::B", usize::MAX);
2382
2383        let runtime = compute_runtime_plan(graph).unwrap();
2384        let src_step = runtime
2385            .steps
2386            .iter()
2387            .find_map(|step| match step {
2388                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2389                _ => None,
2390            })
2391            .unwrap();
2392        let dst_step = runtime
2393            .steps
2394            .iter()
2395            .find_map(|step| match step {
2396                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2397                _ => None,
2398            })
2399            .unwrap();
2400
2401        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2402        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2403        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 0);
2404    }
2405
2406    #[test]
2407    fn test_runtime_output_ports_respect_connection_order_with_nc() {
2408        let txt = r#"(
2409            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2410            cnx: [
2411                (src: "src", dst: "__nc__", msg: "msg::A"),
2412                (src: "src", dst: "sink", msg: "msg::B"),
2413            ]
2414        )"#;
2415        let config = CuConfig::deserialize_ron(txt).unwrap();
2416        let graph = config.get_graph(None).unwrap();
2417        let src_id = graph.get_node_id_by_name("src").unwrap();
2418        let dst_id = graph.get_node_id_by_name("sink").unwrap();
2419
2420        let runtime = compute_runtime_plan(graph).unwrap();
2421        let src_step = runtime
2422            .steps
2423            .iter()
2424            .find_map(|step| match step {
2425                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2426                _ => None,
2427            })
2428            .unwrap();
2429        let dst_step = runtime
2430            .steps
2431            .iter()
2432            .find_map(|step| match step {
2433                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2434                _ => None,
2435            })
2436            .unwrap();
2437
2438        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2439        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2440        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2441    }
2442
2443    #[cfg(feature = "std")]
2444    #[test]
2445    fn test_runtime_output_ports_respect_connection_order_with_nc_from_file() {
2446        let txt = r#"(
2447            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2448            cnx: [
2449                (src: "src", dst: "__nc__", msg: "msg::A"),
2450                (src: "src", dst: "sink", msg: "msg::B"),
2451            ]
2452        )"#;
2453        let tmp = tempfile::NamedTempFile::new().unwrap();
2454        std::fs::write(tmp.path(), txt).unwrap();
2455        let config = crate::config::read_configuration(tmp.path().to_str().unwrap()).unwrap();
2456        let graph = config.get_graph(None).unwrap();
2457        let src_id = graph.get_node_id_by_name("src").unwrap();
2458        let dst_id = graph.get_node_id_by_name("sink").unwrap();
2459
2460        let runtime = compute_runtime_plan(graph).unwrap();
2461        let src_step = runtime
2462            .steps
2463            .iter()
2464            .find_map(|step| match step {
2465                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2466                _ => None,
2467            })
2468            .unwrap();
2469        let dst_step = runtime
2470            .steps
2471            .iter()
2472            .find_map(|step| match step {
2473                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2474                _ => None,
2475            })
2476            .unwrap();
2477
2478        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2479        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B"]);
2480        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2481    }
2482
2483    #[test]
2484    fn test_runtime_output_ports_respect_connection_order_with_nc_primitives() {
2485        let txt = r#"(
2486            tasks: [(id: "src", type: "a"), (id: "sink", type: "b")],
2487            cnx: [
2488                (src: "src", dst: "__nc__", msg: "i32"),
2489                (src: "src", dst: "sink", msg: "bool"),
2490            ]
2491        )"#;
2492        let config = CuConfig::deserialize_ron(txt).unwrap();
2493        let graph = config.get_graph(None).unwrap();
2494        let src_id = graph.get_node_id_by_name("src").unwrap();
2495        let dst_id = graph.get_node_id_by_name("sink").unwrap();
2496
2497        let runtime = compute_runtime_plan(graph).unwrap();
2498        let src_step = runtime
2499            .steps
2500            .iter()
2501            .find_map(|step| match step {
2502                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
2503                _ => None,
2504            })
2505            .unwrap();
2506        let dst_step = runtime
2507            .steps
2508            .iter()
2509            .find_map(|step| match step {
2510                CuExecutionUnit::Step(step) if step.node_id == dst_id => Some(step),
2511                _ => None,
2512            })
2513            .unwrap();
2514
2515        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
2516        assert_eq!(output_pack.msg_types, vec!["i32", "bool"]);
2517        assert_eq!(dst_step.input_msg_indices_types[0].src_port, 1);
2518    }
2519
2520    #[test]
2521    fn test_runtime_plan_diamond_case1() {
2522        // more complex topology that tripped the scheduler
2523        let mut config = CuConfig::default();
2524        let graph = config.get_graph_mut(None).unwrap();
2525        let cam0_id = graph
2526            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2527            .unwrap();
2528        let inf0_id = graph
2529            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2530            .unwrap();
2531        let broadcast_id = graph
2532            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2533            .unwrap();
2534
2535        // case 1 order
2536        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2537        graph.connect(cam0_id, inf0_id, "i32").unwrap();
2538        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2539
2540        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2541        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
2542
2543        assert_eq!(edge_cam0_to_inf0, 0);
2544        assert_eq!(edge_cam0_to_broadcast, 1);
2545
2546        let runtime = compute_runtime_plan(graph).unwrap();
2547        let broadcast_step = runtime
2548            .steps
2549            .iter()
2550            .find_map(|step| match step {
2551                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2552                _ => None,
2553            })
2554            .unwrap();
2555
2556        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2557        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2558    }
2559
2560    #[test]
2561    fn test_runtime_plan_diamond_case2() {
2562        // more complex topology that tripped the scheduler variation 2
2563        let mut config = CuConfig::default();
2564        let graph = config.get_graph_mut(None).unwrap();
2565        let cam0_id = graph
2566            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
2567            .unwrap();
2568        let inf0_id = graph
2569            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
2570            .unwrap();
2571        let broadcast_id = graph
2572            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
2573            .unwrap();
2574
2575        // case 2 order
2576        graph.connect(cam0_id, inf0_id, "i32").unwrap();
2577        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
2578        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
2579
2580        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
2581        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
2582
2583        assert_eq!(edge_cam0_to_broadcast, 0);
2584        assert_eq!(edge_cam0_to_inf0, 1);
2585
2586        let runtime = compute_runtime_plan(graph).unwrap();
2587        let broadcast_step = runtime
2588            .steps
2589            .iter()
2590            .find_map(|step| match step {
2591                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
2592                _ => None,
2593            })
2594            .unwrap();
2595
2596        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
2597        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
2598    }
2599}