Skip to main content

cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9#[cfg(feature = "std")]
10use crate::monitoring::ExecutionProbeHandle;
11use crate::monitoring::{
12    CuMonitor, ExecutionMarker, RuntimeExecutionProbe, build_monitor_topology,
13};
14use crate::resource::ResourceManager;
15use cu29_clock::{ClockProvider, CuTime, RobotClock};
16use cu29_traits::CuResult;
17use cu29_traits::WriteStream;
18use cu29_traits::{CopperListTuple, CuError};
19
20#[cfg(target_os = "none")]
21#[allow(unused_imports)]
22use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
23#[cfg(target_os = "none")]
24#[allow(unused_imports)]
25use cu29_log_derive::info;
26#[cfg(target_os = "none")]
27#[allow(unused_imports)]
28use cu29_log_runtime::log;
29#[cfg(all(target_os = "none", debug_assertions))]
30#[allow(unused_imports)]
31use cu29_log_runtime::log_debug_mode;
32#[cfg(target_os = "none")]
33#[allow(unused_imports)]
34use cu29_value::to_value;
35
36use alloc::boxed::Box;
37use alloc::collections::{BTreeSet, VecDeque};
38use alloc::format;
39use alloc::string::{String, ToString};
40use alloc::vec::Vec;
41use bincode::enc::EncoderImpl;
42use bincode::enc::write::{SizeWriter, SliceWriter};
43use bincode::error::EncodeError;
44use bincode::{Decode, Encode};
45use core::fmt::Result as FmtResult;
46use core::fmt::{Debug, Formatter};
47
48#[cfg(feature = "std")]
49use cu29_log_runtime::LoggerRuntime;
50#[cfg(feature = "std")]
51use cu29_unifiedlog::UnifiedLoggerWrite;
52#[cfg(feature = "std")]
53use std::sync::{Arc, Mutex};
54
55/// Just a simple struct to hold the various bits needed to run a Copper application.
56#[cfg(feature = "std")]
57pub struct CopperContext {
58    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
59    pub logger_runtime: LoggerRuntime,
60    pub clock: RobotClock,
61}
62
63/// Manages the lifecycle of the copper lists and logging.
64pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
65    pub inner: CuListsManager<P, NBCL>,
66    /// Logger for the copper lists (messages between tasks)
67    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
68    /// Last encoded size returned by logger.log
69    pub last_encoded_bytes: u64,
70}
71
72impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
73    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
74        let mut is_top = true;
75        let mut nb_done = 0;
76        for cl in self.inner.iter_mut() {
77            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
78                cl.change_state(CopperListState::DoneProcessing);
79            }
80            if is_top && cl.get_state() == CopperListState::DoneProcessing {
81                if let Some(logger) = &mut self.logger {
82                    cl.change_state(CopperListState::BeingSerialized);
83                    logger.log(cl)?;
84                    self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
85                }
86                cl.change_state(CopperListState::Free);
87                nb_done += 1;
88            } else {
89                is_top = false;
90            }
91        }
92        for _ in 0..nb_done {
93            let _ = self.inner.pop();
94        }
95        Ok(())
96    }
97
98    pub fn available_copper_lists(&self) -> usize {
99        NBCL - self.inner.len()
100    }
101}
102
103/// Manages the frozen tasks state and logging.
104pub struct KeyFramesManager {
105    /// Where the serialized tasks are stored following the wave of execution of a CL.
106    inner: KeyFrame,
107
108    /// Optional override for the timestamp to stamp the next keyframe (used by deterministic replay).
109    forced_timestamp: Option<CuTime>,
110
111    /// If set, reuse this keyframe verbatim (e.g., during replay) instead of re-freezing state.
112    locked: bool,
113
114    /// Logger for the state of the tasks (frozen tasks)
115    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
116
117    /// Capture a keyframe only each...
118    keyframe_interval: u32,
119
120    /// Bytes written by the last keyframe log
121    pub last_encoded_bytes: u64,
122}
123
124impl KeyFramesManager {
125    fn is_keyframe(&self, culistid: u64) -> bool {
126        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval as u64)
127    }
128
129    pub fn reset(&mut self, culistid: u64, clock: &RobotClock) {
130        if self.is_keyframe(culistid) {
131            // If a recorded keyframe was preloaded for this CL, keep it as-is.
132            if self.locked && self.inner.culistid == culistid {
133                return;
134            }
135            let ts = self.forced_timestamp.take().unwrap_or_else(|| clock.now());
136            self.inner.reset(culistid, ts);
137            self.locked = false;
138        }
139    }
140
141    /// Force the timestamp of the next keyframe to a given value.
142    #[cfg(feature = "std")]
143    pub fn set_forced_timestamp(&mut self, ts: CuTime) {
144        self.forced_timestamp = Some(ts);
145    }
146
147    pub fn freeze_task(&mut self, culistid: u64, task: &impl Freezable) -> CuResult<usize> {
148        if self.is_keyframe(culistid) {
149            if self.locked {
150                // We are replaying a recorded keyframe verbatim; don't mutate it.
151                return Ok(0);
152            }
153            if self.inner.culistid != culistid {
154                return Err(CuError::from(format!(
155                    "Freezing task for culistid {} but current keyframe is {}",
156                    culistid, self.inner.culistid
157                )));
158            }
159            self.inner
160                .add_frozen_task(task)
161                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
162        } else {
163            Ok(0)
164        }
165    }
166
167    /// Generic helper to freeze any `Freezable` state (task or bridge) into the current keyframe.
168    pub fn freeze_any(&mut self, culistid: u64, item: &impl Freezable) -> CuResult<usize> {
169        self.freeze_task(culistid, item)
170    }
171
172    pub fn end_of_processing(&mut self, culistid: u64) -> CuResult<()> {
173        if self.is_keyframe(culistid) {
174            let logger = self.logger.as_mut().unwrap();
175            logger.log(&self.inner)?;
176            self.last_encoded_bytes = logger.last_log_bytes().unwrap_or(0) as u64;
177            // Clear the lock so the next CL can rebuild normally unless re-locked.
178            self.locked = false;
179            Ok(())
180        } else {
181            // Not a keyframe for this CL; ensure we don't carry stale sizes forward.
182            self.last_encoded_bytes = 0;
183            Ok(())
184        }
185    }
186
187    /// Preload a recorded keyframe so it is logged verbatim on the matching CL.
188    #[cfg(feature = "std")]
189    pub fn lock_keyframe(&mut self, keyframe: &KeyFrame) {
190        self.inner = keyframe.clone();
191        self.forced_timestamp = Some(keyframe.timestamp);
192        self.locked = true;
193    }
194}
195
196/// This is the main structure that will be injected as a member of the Application struct.
197/// CT is the tuple of all the tasks in order of execution.
198/// CL is the type of the copper list, representing the input/output messages for all the tasks.
199pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
200    /// The base clock the runtime will be using to record time.
201    pub clock: RobotClock, // TODO: remove public at some point
202
203    /// The tuple of all the tasks in order of execution.
204    pub tasks: CT,
205
206    /// Tuple of all instantiated bridges.
207    pub bridges: CB,
208
209    /// Resource registry kept alive for tasks borrowing shared handles.
210    pub resources: ResourceManager,
211
212    /// The runtime monitoring.
213    pub monitor: M,
214
215    /// Runtime-side execution progress probe for watchdog/diagnostic monitors.
216    ///
217    /// This probe is written from the generated execution plan before each component
218    /// step. Monitors consume it asynchronously (typically from watchdog threads) to
219    /// report the last known component/step/culist when the runtime appears stalled.
220    #[cfg(feature = "std")]
221    pub execution_probe: ExecutionProbeHandle,
222    #[cfg(not(feature = "std"))]
223    pub execution_probe: RuntimeExecutionProbe,
224
225    /// The logger for the copper lists (messages between tasks)
226    pub copperlists_manager: CopperListsManager<P, NBCL>,
227
228    /// The logger for the state of the tasks (frozen tasks)
229    pub keyframes_manager: KeyFramesManager,
230
231    /// The runtime configuration controlling the behavior of the run loop
232    pub runtime_config: RuntimeConfig,
233}
234
235/// To be able to share the clock we make the runtime a clock provider.
236impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
237    ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
238{
239    fn get_clock(&self) -> RobotClock {
240        self.clock.clone()
241    }
242}
243
244/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
245/// It is a double encapsulation: this one recording the culistid and another even in
246/// bincode in the serialized_tasks.
247#[derive(Clone, Encode, Decode)]
248pub struct KeyFrame {
249    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
250    pub culistid: u64,
251    // This is the timestamp when the keyframe was created, using the robot clock.
252    pub timestamp: CuTime,
253    // This is the bincode representation of the tuple of all the tasks.
254    pub serialized_tasks: Vec<u8>,
255}
256
257impl KeyFrame {
258    fn new() -> Self {
259        KeyFrame {
260            culistid: 0,
261            timestamp: CuTime::default(),
262            serialized_tasks: Vec::new(),
263        }
264    }
265
266    /// This is to be able to avoid reallocations
267    fn reset(&mut self, culistid: u64, timestamp: CuTime) {
268        self.culistid = culistid;
269        self.timestamp = timestamp;
270        self.serialized_tasks.clear();
271    }
272
273    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
274    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
275        let cfg = bincode::config::standard();
276        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
277        BincodeAdapter(task).encode(&mut sizer)?;
278        let need = sizer.into_writer().bytes_written as usize;
279
280        let start = self.serialized_tasks.len();
281        self.serialized_tasks.resize(start + need, 0);
282        let mut enc =
283            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
284        BincodeAdapter(task).encode(&mut enc)?;
285        Ok(need)
286    }
287}
288
289/// Identifies where the effective runtime configuration came from.
290#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
291pub enum RuntimeLifecycleConfigSource {
292    ProgrammaticOverride,
293    ExternalFile,
294    BundledDefault,
295}
296
297/// Build-time stack identification metadata.
298#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
299pub struct RuntimeLifecycleStackInfo {
300    pub app_name: String,
301    pub app_version: String,
302    pub git_commit: Option<String>,
303    pub git_dirty: Option<bool>,
304}
305
306/// Runtime lifecycle events emitted in the dedicated lifecycle section.
307#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
308pub enum RuntimeLifecycleEvent {
309    Instantiated {
310        config_source: RuntimeLifecycleConfigSource,
311        effective_config_ron: String,
312        stack: RuntimeLifecycleStackInfo,
313    },
314    MissionStarted {
315        mission: String,
316    },
317    MissionStopped {
318        mission: String,
319        // TODO(lifecycle): replace free-form reason with a typed stop reason enum once
320        // std/no-std behavior and panic integration are split in a follow-up PR.
321        reason: String,
322    },
323    // TODO(lifecycle): wire panic hook / no_std equivalent to emit this event consistently.
324    Panic {
325        message: String,
326        file: Option<String>,
327        line: Option<u32>,
328        column: Option<u32>,
329    },
330    ShutdownCompleted,
331}
332
333/// One event record persisted in the `UnifiedLogType::RuntimeLifecycle` section.
334#[derive(Clone, Encode, Decode, Debug, PartialEq, Eq)]
335pub struct RuntimeLifecycleRecord {
336    pub timestamp: CuTime,
337    pub event: RuntimeLifecycleEvent,
338}
339
340impl<
341    CT,
342    CB,
343    P: CopperListTuple + CuListZeroedInit + Default + 'static,
344    M: CuMonitor,
345    const NBCL: usize,
346> CuRuntime<CT, CB, P, M, NBCL>
347{
348    /// Records runtime execution progress in the shared probe.
349    ///
350    /// This is intentionally lightweight and does not call monitor callbacks.
351    #[inline]
352    pub fn record_execution_marker(&self, marker: ExecutionMarker) {
353        self.execution_probe.record(marker);
354    }
355
356    // FIXME(gbin): this became REALLY ugly with no-std
357    #[allow(clippy::too_many_arguments)]
358    #[cfg(feature = "std")]
359    pub fn new(
360        clock: RobotClock,
361        config: &CuConfig,
362        mission: Option<&str>,
363        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
364        tasks_instanciator: impl for<'c> Fn(
365            Vec<Option<&'c ComponentConfig>>,
366            &mut ResourceManager,
367        ) -> CuResult<CT>,
368        monitor_instanciator: impl Fn(&CuConfig) -> M,
369        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
370        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
371        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
372    ) -> CuResult<Self> {
373        let resources = resources_instanciator(config)?;
374        Self::new_with_resources(
375            clock,
376            config,
377            mission,
378            resources,
379            tasks_instanciator,
380            monitor_instanciator,
381            bridges_instanciator,
382            copperlists_logger,
383            keyframes_logger,
384        )
385    }
386
387    #[allow(clippy::too_many_arguments)]
388    #[cfg(feature = "std")]
389    pub fn new_with_resources(
390        clock: RobotClock,
391        config: &CuConfig,
392        mission: Option<&str>,
393        mut resources: ResourceManager,
394        tasks_instanciator: impl for<'c> Fn(
395            Vec<Option<&'c ComponentConfig>>,
396            &mut ResourceManager,
397        ) -> CuResult<CT>,
398        monitor_instanciator: impl Fn(&CuConfig) -> M,
399        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
400        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
401        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
402    ) -> CuResult<Self> {
403        let graph = config.get_graph(mission)?;
404        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
405            .get_all_nodes()
406            .iter()
407            .map(|(_, node)| node.get_instance_config())
408            .collect();
409
410        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
411        let mut monitor = monitor_instanciator(config);
412        let execution_probe = std::sync::Arc::new(RuntimeExecutionProbe::default());
413        monitor.set_execution_probe(execution_probe.clone());
414        if let Ok(topology) = build_monitor_topology(config, mission) {
415            monitor.set_topology(topology);
416        }
417        let bridges = bridges_instanciator(config, &mut resources)?;
418
419        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
420            Some(logging_config) if logging_config.enable_task_logging => (
421                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
422                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
423                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
424            ),
425            Some(_) => (None, None, 0), // explicit no enable logging
426            None => (
427                // default
428                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
429                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
430                DEFAULT_KEYFRAME_INTERVAL,
431            ),
432        };
433
434        let copperlists_manager = CopperListsManager {
435            inner: CuListsManager::new(),
436            logger: copperlists_logger,
437            last_encoded_bytes: 0,
438        };
439        #[cfg(target_os = "none")]
440        {
441            let cl_size = core::mem::size_of::<CopperList<P>>();
442            let total_bytes = cl_size.saturating_mul(NBCL);
443            info!(
444                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
445                NBCL, cl_size, total_bytes
446            );
447        }
448
449        let keyframes_manager = KeyFramesManager {
450            inner: KeyFrame::new(),
451            logger: keyframes_logger,
452            keyframe_interval,
453            last_encoded_bytes: 0,
454            forced_timestamp: None,
455            locked: false,
456        };
457
458        let runtime_config = config.runtime.clone().unwrap_or_default();
459
460        let runtime = Self {
461            tasks,
462            bridges,
463            resources,
464            monitor,
465            execution_probe,
466            clock,
467            copperlists_manager,
468            keyframes_manager,
469            runtime_config,
470        };
471
472        Ok(runtime)
473    }
474
475    #[allow(clippy::too_many_arguments)]
476    #[cfg(not(feature = "std"))]
477    pub fn new(
478        clock: RobotClock,
479        config: &CuConfig,
480        mission: Option<&str>,
481        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
482        tasks_instanciator: impl for<'c> Fn(
483            Vec<Option<&'c ComponentConfig>>,
484            &mut ResourceManager,
485        ) -> CuResult<CT>,
486        monitor_instanciator: impl Fn(&CuConfig) -> M,
487        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
488        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
489        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
490    ) -> CuResult<Self> {
491        #[cfg(target_os = "none")]
492        info!("CuRuntime::new: resources instanciator");
493        let resources = resources_instanciator(config)?;
494        Self::new_with_resources(
495            clock,
496            config,
497            mission,
498            resources,
499            tasks_instanciator,
500            monitor_instanciator,
501            bridges_instanciator,
502            copperlists_logger,
503            keyframes_logger,
504        )
505    }
506
507    #[allow(clippy::too_many_arguments)]
508    #[cfg(not(feature = "std"))]
509    pub fn new_with_resources(
510        clock: RobotClock,
511        config: &CuConfig,
512        mission: Option<&str>,
513        mut resources: ResourceManager,
514        tasks_instanciator: impl for<'c> Fn(
515            Vec<Option<&'c ComponentConfig>>,
516            &mut ResourceManager,
517        ) -> CuResult<CT>,
518        monitor_instanciator: impl Fn(&CuConfig) -> M,
519        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
520        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
521        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
522    ) -> CuResult<Self> {
523        #[cfg(target_os = "none")]
524        info!("CuRuntime::new: get graph");
525        let graph = config.get_graph(mission)?;
526        #[cfg(target_os = "none")]
527        info!("CuRuntime::new: graph ok");
528        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
529            .get_all_nodes()
530            .iter()
531            .map(|(_, node)| node.get_instance_config())
532            .collect();
533
534        #[cfg(target_os = "none")]
535        info!("CuRuntime::new: tasks instanciator");
536        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
537
538        #[cfg(target_os = "none")]
539        info!("CuRuntime::new: monitor instanciator");
540        let mut monitor = monitor_instanciator(config);
541        let execution_probe = RuntimeExecutionProbe::default();
542        #[cfg(target_os = "none")]
543        info!("CuRuntime::new: monitor instanciator ok");
544        #[cfg(target_os = "none")]
545        info!("CuRuntime::new: build monitor topology");
546        if let Ok(topology) = build_monitor_topology(config, mission) {
547            #[cfg(target_os = "none")]
548            info!("CuRuntime::new: monitor topology ok");
549            monitor.set_topology(topology);
550            #[cfg(target_os = "none")]
551            info!("CuRuntime::new: monitor topology set");
552        }
553        #[cfg(target_os = "none")]
554        info!("CuRuntime::new: bridges instanciator");
555        let bridges = bridges_instanciator(config, &mut resources)?;
556
557        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
558            Some(logging_config) if logging_config.enable_task_logging => (
559                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
560                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
561                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
562            ),
563            Some(_) => (None, None, 0), // explicit no enable logging
564            None => (
565                // default
566                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
567                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
568                DEFAULT_KEYFRAME_INTERVAL,
569            ),
570        };
571
572        let copperlists_manager = CopperListsManager {
573            inner: CuListsManager::new(),
574            logger: copperlists_logger,
575            last_encoded_bytes: 0,
576        };
577        #[cfg(target_os = "none")]
578        {
579            let cl_size = core::mem::size_of::<CopperList<P>>();
580            let total_bytes = cl_size.saturating_mul(NBCL);
581            info!(
582                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
583                NBCL, cl_size, total_bytes
584            );
585        }
586
587        let keyframes_manager = KeyFramesManager {
588            inner: KeyFrame::new(),
589            logger: keyframes_logger,
590            keyframe_interval,
591            last_encoded_bytes: 0,
592            forced_timestamp: None,
593            locked: false,
594        };
595
596        let runtime_config = config.runtime.clone().unwrap_or_default();
597
598        let runtime = Self {
599            tasks,
600            bridges,
601            resources,
602            monitor,
603            execution_probe,
604            clock,
605            copperlists_manager,
606            keyframes_manager,
607            runtime_config,
608        };
609
610        Ok(runtime)
611    }
612}
613
614/// Copper tasks can be of 3 types:
615/// - Source: only producing output messages (usually used for drivers)
616/// - Regular: processing input messages and producing output messages, more like compute nodes.
617/// - Sink: only consuming input messages (usually used for actuators)
618#[derive(Debug, PartialEq, Eq, Clone, Copy)]
619pub enum CuTaskType {
620    Source,
621    Regular,
622    Sink,
623}
624
625#[derive(Debug, Clone)]
626pub struct CuOutputPack {
627    pub culist_index: u32,
628    pub msg_types: Vec<String>,
629}
630
631#[derive(Debug, Clone)]
632pub struct CuInputMsg {
633    pub culist_index: u32,
634    pub msg_type: String,
635    pub src_port: usize,
636    pub edge_id: usize,
637}
638
639/// This structure represents a step in the execution plan.
640pub struct CuExecutionStep {
641    /// NodeId: node id of the task to execute
642    pub node_id: NodeId,
643    /// Node: node instance
644    pub node: Node,
645    /// CuTaskType: type of the task
646    pub task_type: CuTaskType,
647
648    /// the indices in the copper list of the input messages and their types
649    pub input_msg_indices_types: Vec<CuInputMsg>,
650
651    /// the index in the copper list of the output message and its type
652    pub output_msg_pack: Option<CuOutputPack>,
653}
654
655impl Debug for CuExecutionStep {
656    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
657        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
658        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
659        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
660        f.write_str(
661            format!(
662                "              input_msg_types: {:?}\n",
663                self.input_msg_indices_types
664            )
665            .as_str(),
666        )?;
667        f.write_str(format!("       output_msg_pack: {:?}\n", self.output_msg_pack).as_str())?;
668        Ok(())
669    }
670}
671
672/// This structure represents a loop in the execution plan.
673/// It is used to represent a sequence of Execution units (loop or steps) that are executed
674/// multiple times.
675/// if loop_count is None, the loop is infinite.
676pub struct CuExecutionLoop {
677    pub steps: Vec<CuExecutionUnit>,
678    pub loop_count: Option<u32>,
679}
680
681impl Debug for CuExecutionLoop {
682    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
683        f.write_str("CuExecutionLoop:\n")?;
684        for step in &self.steps {
685            match step {
686                CuExecutionUnit::Step(step) => {
687                    step.fmt(f)?;
688                }
689                CuExecutionUnit::Loop(l) => {
690                    l.fmt(f)?;
691                }
692            }
693        }
694
695        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
696        Ok(())
697    }
698}
699
700/// This structure represents a step in the execution plan.
701#[derive(Debug)]
702pub enum CuExecutionUnit {
703    Step(CuExecutionStep),
704    Loop(CuExecutionLoop),
705}
706
707fn find_output_pack_from_nodeid(
708    node_id: NodeId,
709    steps: &Vec<CuExecutionUnit>,
710) -> Option<CuOutputPack> {
711    for step in steps {
712        match step {
713            CuExecutionUnit::Loop(loop_unit) => {
714                if let Some(output_pack) = find_output_pack_from_nodeid(node_id, &loop_unit.steps) {
715                    return Some(output_pack);
716                }
717            }
718            CuExecutionUnit::Step(step) => {
719                if step.node_id == node_id {
720                    return step.output_msg_pack.clone();
721                }
722            }
723        }
724    }
725    None
726}
727
728pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
729    if graph.incoming_neighbor_count(node_id) == 0 {
730        CuTaskType::Source
731    } else if graph.outgoing_neighbor_count(node_id) == 0 {
732        CuTaskType::Sink
733    } else {
734        CuTaskType::Regular
735    }
736}
737
738/// The connection id used here is the index of the config graph edge that equates to the wanted
739/// connection.
740fn sort_inputs_by_cnx_id(input_msg_indices_types: &mut [CuInputMsg]) {
741    input_msg_indices_types.sort_by_key(|input| input.edge_id);
742}
743
744fn collect_output_msg_types(graph: &CuGraph, node_id: NodeId) -> Vec<String> {
745    let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
746    edge_ids.sort();
747
748    let mut msg_types = Vec::new();
749    let mut seen = Vec::new();
750    for edge_id in edge_ids {
751        if let Some(edge) = graph.edge(edge_id) {
752            if seen.iter().any(|msg| msg == &edge.msg) {
753                continue;
754            }
755            seen.push(edge.msg.clone());
756            msg_types.push(edge.msg.clone());
757        }
758    }
759    msg_types
760}
761/// Explores a subbranch and build the partial plan out of it.
762fn plan_tasks_tree_branch(
763    graph: &CuGraph,
764    mut next_culist_output_index: u32,
765    starting_point: NodeId,
766    plan: &mut Vec<CuExecutionUnit>,
767) -> (u32, bool) {
768    #[cfg(all(feature = "std", feature = "macro_debug"))]
769    eprintln!("-- starting branch from node {starting_point}");
770
771    let mut handled = false;
772
773    for id in graph.bfs_nodes(starting_point) {
774        let node_ref = graph.get_node(id).unwrap();
775        #[cfg(all(feature = "std", feature = "macro_debug"))]
776        eprintln!("  Visiting node: {node_ref:?}");
777
778        let mut input_msg_indices_types: Vec<CuInputMsg> = Vec::new();
779        let output_msg_pack: Option<CuOutputPack>;
780        let task_type = find_task_type_for_id(graph, id);
781
782        match task_type {
783            CuTaskType::Source => {
784                #[cfg(all(feature = "std", feature = "macro_debug"))]
785                eprintln!("    → Source node, assign output index {next_culist_output_index}");
786                let msg_types = collect_output_msg_types(graph, id);
787                if msg_types.is_empty() {
788                    panic!(
789                        "Source node '{}' has no outgoing connections",
790                        node_ref.get_id()
791                    );
792                }
793                output_msg_pack = Some(CuOutputPack {
794                    culist_index: next_culist_output_index,
795                    msg_types,
796                });
797                next_culist_output_index += 1;
798            }
799            CuTaskType::Sink => {
800                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
801                edge_ids.sort();
802                #[cfg(all(feature = "std", feature = "macro_debug"))]
803                eprintln!("    → Sink with incoming edges: {edge_ids:?}");
804                for edge_id in edge_ids {
805                    let edge = graph
806                        .edge(edge_id)
807                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
808                    let pid = graph
809                        .get_node_id_by_name(edge.src.as_str())
810                        .unwrap_or_else(|| {
811                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
812                        });
813                    let output_pack = find_output_pack_from_nodeid(pid, plan);
814                    if let Some(output_pack) = output_pack {
815                        #[cfg(all(feature = "std", feature = "macro_debug"))]
816                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
817                        let msg_type = edge.msg.as_str();
818                        let src_port = output_pack
819                            .msg_types
820                            .iter()
821                            .position(|msg| msg == msg_type)
822                            .unwrap_or_else(|| {
823                                panic!(
824                                    "Missing output port for message type '{msg_type}' on node {pid}"
825                                )
826                            });
827                        input_msg_indices_types.push(CuInputMsg {
828                            culist_index: output_pack.culist_index,
829                            msg_type: msg_type.to_string(),
830                            src_port,
831                            edge_id,
832                        });
833                    } else {
834                        #[cfg(all(feature = "std", feature = "macro_debug"))]
835                        eprintln!("      ✗ Input from {pid} not ready, returning");
836                        return (next_culist_output_index, handled);
837                    }
838                }
839                output_msg_pack = Some(CuOutputPack {
840                    culist_index: next_culist_output_index,
841                    msg_types: Vec::from(["()".to_string()]),
842                });
843                next_culist_output_index += 1;
844            }
845            CuTaskType::Regular => {
846                let mut edge_ids = graph.get_dst_edges(id).unwrap_or_default();
847                edge_ids.sort();
848                #[cfg(all(feature = "std", feature = "macro_debug"))]
849                eprintln!("    → Regular task with incoming edges: {edge_ids:?}");
850                for edge_id in edge_ids {
851                    let edge = graph
852                        .edge(edge_id)
853                        .unwrap_or_else(|| panic!("Missing edge {edge_id} for node {id}"));
854                    let pid = graph
855                        .get_node_id_by_name(edge.src.as_str())
856                        .unwrap_or_else(|| {
857                            panic!("Missing source node '{}' for edge {edge_id}", edge.src)
858                        });
859                    let output_pack = find_output_pack_from_nodeid(pid, plan);
860                    if let Some(output_pack) = output_pack {
861                        #[cfg(all(feature = "std", feature = "macro_debug"))]
862                        eprintln!("      ✓ Input from {pid} ready: {output_pack:?}");
863                        let msg_type = edge.msg.as_str();
864                        let src_port = output_pack
865                            .msg_types
866                            .iter()
867                            .position(|msg| msg == msg_type)
868                            .unwrap_or_else(|| {
869                                panic!(
870                                    "Missing output port for message type '{msg_type}' on node {pid}"
871                                )
872                            });
873                        input_msg_indices_types.push(CuInputMsg {
874                            culist_index: output_pack.culist_index,
875                            msg_type: msg_type.to_string(),
876                            src_port,
877                            edge_id,
878                        });
879                    } else {
880                        #[cfg(all(feature = "std", feature = "macro_debug"))]
881                        eprintln!("      ✗ Input from {pid} not ready, returning");
882                        return (next_culist_output_index, handled);
883                    }
884                }
885                let msg_types = collect_output_msg_types(graph, id);
886                if msg_types.is_empty() {
887                    panic!(
888                        "Regular node '{}' has no outgoing connections",
889                        node_ref.get_id()
890                    );
891                }
892                output_msg_pack = Some(CuOutputPack {
893                    culist_index: next_culist_output_index,
894                    msg_types,
895                });
896                next_culist_output_index += 1;
897            }
898        }
899
900        sort_inputs_by_cnx_id(&mut input_msg_indices_types);
901
902        if let Some(pos) = plan
903            .iter()
904            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
905        {
906            #[cfg(all(feature = "std", feature = "macro_debug"))]
907            eprintln!("    → Already in plan, modifying existing step");
908            let mut step = plan.remove(pos);
909            if let CuExecutionUnit::Step(ref mut s) = step {
910                s.input_msg_indices_types = input_msg_indices_types;
911            }
912            plan.push(step);
913        } else {
914            #[cfg(all(feature = "std", feature = "macro_debug"))]
915            eprintln!("    → New step added to plan");
916            let step = CuExecutionStep {
917                node_id: id,
918                node: node_ref.clone(),
919                task_type,
920                input_msg_indices_types,
921                output_msg_pack,
922            };
923            plan.push(CuExecutionUnit::Step(step));
924        }
925
926        handled = true;
927    }
928
929    #[cfg(all(feature = "std", feature = "macro_debug"))]
930    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
931    (next_culist_output_index, handled)
932}
933
934/// This is the main heuristics to compute an execution plan at compilation time.
935/// TODO(gbin): Make that heuristic pluggable.
936pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
937    #[cfg(all(feature = "std", feature = "macro_debug"))]
938    eprintln!("[runtime plan]");
939    let mut plan = Vec::new();
940    let mut next_culist_output_index = 0u32;
941
942    let mut queue: VecDeque<NodeId> = graph
943        .node_ids()
944        .into_iter()
945        .filter(|&node_id| find_task_type_for_id(graph, node_id) == CuTaskType::Source)
946        .collect();
947
948    #[cfg(all(feature = "std", feature = "macro_debug"))]
949    eprintln!("Initial source nodes: {queue:?}");
950
951    while let Some(start_node) = queue.pop_front() {
952        #[cfg(all(feature = "std", feature = "macro_debug"))]
953        eprintln!("→ Starting BFS from source {start_node}");
954        for node_id in graph.bfs_nodes(start_node) {
955            let already_in_plan = plan
956                .iter()
957                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
958            if already_in_plan {
959                #[cfg(all(feature = "std", feature = "macro_debug"))]
960                eprintln!("    → Node {node_id} already planned, skipping");
961                continue;
962            }
963
964            #[cfg(all(feature = "std", feature = "macro_debug"))]
965            eprintln!("    Planning from node {node_id}");
966            let (new_index, handled) =
967                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
968            next_culist_output_index = new_index;
969
970            if !handled {
971                #[cfg(all(feature = "std", feature = "macro_debug"))]
972                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
973                continue;
974            }
975
976            #[cfg(all(feature = "std", feature = "macro_debug"))]
977            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
978            for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
979                #[cfg(all(feature = "std", feature = "macro_debug"))]
980                eprintln!("      → Enqueueing neighbor {neighbor}");
981                queue.push_back(neighbor);
982            }
983        }
984    }
985
986    let mut planned_nodes = BTreeSet::new();
987    for unit in &plan {
988        if let CuExecutionUnit::Step(step) = unit {
989            planned_nodes.insert(step.node_id);
990        }
991    }
992
993    let mut missing = Vec::new();
994    for node_id in graph.node_ids() {
995        if !planned_nodes.contains(&node_id) {
996            if let Some(node) = graph.get_node(node_id) {
997                missing.push(node.get_id().to_string());
998            } else {
999                missing.push(format!("node_id_{node_id}"));
1000            }
1001        }
1002    }
1003
1004    if !missing.is_empty() {
1005        missing.sort();
1006        return Err(CuError::from(format!(
1007            "Execution plan could not include all nodes. Missing: {}. Check for loopback or missing source connections.",
1008            missing.join(", ")
1009        )));
1010    }
1011
1012    Ok(CuExecutionLoop {
1013        steps: plan,
1014        loop_count: None,
1015    })
1016}
1017
1018//tests
1019#[cfg(test)]
1020mod tests {
1021    use super::*;
1022    use crate::config::Node;
1023    use crate::context::CuContext;
1024    use crate::cutask::CuSinkTask;
1025    use crate::cutask::{CuSrcTask, Freezable};
1026    use crate::monitoring::NoMonitor;
1027    use crate::reflect::Reflect;
1028    use bincode::Encode;
1029    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
1030    use serde_derive::{Deserialize, Serialize};
1031
1032    #[derive(Reflect)]
1033    pub struct TestSource {}
1034
1035    impl Freezable for TestSource {}
1036
1037    impl CuSrcTask for TestSource {
1038        type Resources<'r> = ();
1039        type Output<'m> = ();
1040        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1041        where
1042            Self: Sized,
1043        {
1044            Ok(Self {})
1045        }
1046
1047        fn process(&mut self, _ctx: &CuContext, _empty_msg: &mut Self::Output<'_>) -> CuResult<()> {
1048            Ok(())
1049        }
1050    }
1051
1052    #[derive(Reflect)]
1053    pub struct TestSink {}
1054
1055    impl Freezable for TestSink {}
1056
1057    impl CuSinkTask for TestSink {
1058        type Resources<'r> = ();
1059        type Input<'m> = ();
1060
1061        fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
1062        where
1063            Self: Sized,
1064        {
1065            Ok(Self {})
1066        }
1067
1068        fn process(&mut self, _ctx: &CuContext, _input: &Self::Input<'_>) -> CuResult<()> {
1069            Ok(())
1070        }
1071    }
1072
1073    // Those should be generated by the derive macro
1074    type Tasks = (TestSource, TestSink);
1075
1076    #[derive(Debug, Encode, Decode, Serialize, Deserialize, Default)]
1077    struct Msgs(());
1078
1079    impl ErasedCuStampedDataSet for Msgs {
1080        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1081            Vec::new()
1082        }
1083    }
1084
1085    impl MatchingTasks for Msgs {
1086        fn get_all_task_ids() -> &'static [&'static str] {
1087            &[]
1088        }
1089    }
1090
1091    impl CuListZeroedInit for Msgs {
1092        fn init_zeroed(&mut self) {}
1093    }
1094
1095    #[cfg(feature = "std")]
1096    fn tasks_instanciator(
1097        all_instances_configs: Vec<Option<&ComponentConfig>>,
1098        _resources: &mut ResourceManager,
1099    ) -> CuResult<Tasks> {
1100        Ok((
1101            TestSource::new(all_instances_configs[0], ())?,
1102            TestSink::new(all_instances_configs[1], ())?,
1103        ))
1104    }
1105
1106    #[cfg(not(feature = "std"))]
1107    fn tasks_instanciator(
1108        all_instances_configs: Vec<Option<&ComponentConfig>>,
1109        _resources: &mut ResourceManager,
1110    ) -> CuResult<Tasks> {
1111        Ok((
1112            TestSource::new(all_instances_configs[0], ())?,
1113            TestSink::new(all_instances_configs[1], ())?,
1114        ))
1115    }
1116
1117    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
1118        NoMonitor {}
1119    }
1120
1121    fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
1122        Ok(())
1123    }
1124
1125    fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
1126        Ok(ResourceManager::new(&[]))
1127    }
1128
1129    #[derive(Debug)]
1130    struct FakeWriter {}
1131
1132    impl<E: Encode> WriteStream<E> for FakeWriter {
1133        fn log(&mut self, _obj: &E) -> CuResult<()> {
1134            Ok(())
1135        }
1136    }
1137
1138    #[test]
1139    fn test_runtime_instantiation() {
1140        let mut config = CuConfig::default();
1141        let graph = config.get_graph_mut(None).unwrap();
1142        graph.add_node(Node::new("a", "TestSource")).unwrap();
1143        graph.add_node(Node::new("b", "TestSink")).unwrap();
1144        graph.connect(0, 1, "()").unwrap();
1145        let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
1146            RobotClock::default(),
1147            &config,
1148            None,
1149            resources_instanciator,
1150            tasks_instanciator,
1151            monitor_instanciator,
1152            bridges_instanciator,
1153            FakeWriter {},
1154            FakeWriter {},
1155        );
1156        assert!(runtime.is_ok());
1157    }
1158
1159    #[test]
1160    fn test_copperlists_manager_lifecycle() {
1161        let mut config = CuConfig::default();
1162        let graph = config.get_graph_mut(None).unwrap();
1163        graph.add_node(Node::new("a", "TestSource")).unwrap();
1164        graph.add_node(Node::new("b", "TestSink")).unwrap();
1165        graph.connect(0, 1, "()").unwrap();
1166
1167        let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
1168            RobotClock::default(),
1169            &config,
1170            None,
1171            resources_instanciator,
1172            tasks_instanciator,
1173            monitor_instanciator,
1174            bridges_instanciator,
1175            FakeWriter {},
1176            FakeWriter {},
1177        )
1178        .unwrap();
1179
1180        // Now emulates the generated runtime
1181        {
1182            let copperlists = &mut runtime.copperlists_manager;
1183            let culist0 = copperlists
1184                .inner
1185                .create()
1186                .expect("Ran out of space for copper lists");
1187            // FIXME: error handling.
1188            let id = culist0.id;
1189            assert_eq!(id, 0);
1190            culist0.change_state(CopperListState::Processing);
1191            assert_eq!(copperlists.available_copper_lists(), 1);
1192        }
1193
1194        {
1195            let copperlists = &mut runtime.copperlists_manager;
1196            let culist1 = copperlists
1197                .inner
1198                .create()
1199                .expect("Ran out of space for copper lists"); // FIXME: error handling.
1200            let id = culist1.id;
1201            assert_eq!(id, 1);
1202            culist1.change_state(CopperListState::Processing);
1203            assert_eq!(copperlists.available_copper_lists(), 0);
1204        }
1205
1206        {
1207            let copperlists = &mut runtime.copperlists_manager;
1208            let culist2 = copperlists.inner.create();
1209            assert!(culist2.is_none());
1210            assert_eq!(copperlists.available_copper_lists(), 0);
1211            // Free in order, should let the top of the stack be serialized and freed.
1212            let _ = copperlists.end_of_processing(1);
1213            assert_eq!(copperlists.available_copper_lists(), 1);
1214        }
1215
1216        // Readd a CL
1217        {
1218            let copperlists = &mut runtime.copperlists_manager;
1219            let culist2 = copperlists
1220                .inner
1221                .create()
1222                .expect("Ran out of space for copper lists"); // FIXME: error handling.
1223            let id = culist2.id;
1224            assert_eq!(id, 2);
1225            culist2.change_state(CopperListState::Processing);
1226            assert_eq!(copperlists.available_copper_lists(), 0);
1227            // Free out of order, the #0 first
1228            let _ = copperlists.end_of_processing(0);
1229            // Should not free up the top of the stack
1230            assert_eq!(copperlists.available_copper_lists(), 0);
1231
1232            // Free up the top of the stack
1233            let _ = copperlists.end_of_processing(2);
1234            // This should free up 2 CLs
1235
1236            assert_eq!(copperlists.available_copper_lists(), 2);
1237        }
1238    }
1239
1240    #[test]
1241    fn test_runtime_task_input_order() {
1242        let mut config = CuConfig::default();
1243        let graph = config.get_graph_mut(None).unwrap();
1244        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
1245        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
1246        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
1247
1248        assert_eq!(src1_id, 0);
1249        assert_eq!(src2_id, 1);
1250
1251        // note that the source2 connection is before the source1
1252        let src1_type = "src1_type";
1253        let src2_type = "src2_type";
1254        graph.connect(src2_id, sink_id, src2_type).unwrap();
1255        graph.connect(src1_id, sink_id, src1_type).unwrap();
1256
1257        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
1258        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
1259        // the edge id depends on the order the connection is created, not
1260        // on the node id, and that is what determines the input order
1261        assert_eq!(src1_edge_id, 1);
1262        assert_eq!(src2_edge_id, 0);
1263
1264        let runtime = compute_runtime_plan(graph).unwrap();
1265        let sink_step = runtime
1266            .steps
1267            .iter()
1268            .find_map(|step| match step {
1269                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1270                _ => None,
1271            })
1272            .unwrap();
1273
1274        // since the src2 connection was added before src1 connection, the src2 type should be
1275        // first
1276        assert_eq!(sink_step.input_msg_indices_types[0].msg_type, src2_type);
1277        assert_eq!(sink_step.input_msg_indices_types[1].msg_type, src1_type);
1278    }
1279
1280    #[test]
1281    fn test_runtime_output_ports_unique_ordered() {
1282        let mut config = CuConfig::default();
1283        let graph = config.get_graph_mut(None).unwrap();
1284        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1285        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
1286        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
1287        let dst_a2_id = graph.add_node(Node::new("dst_a2", "SinkA2")).unwrap();
1288        let dst_c_id = graph.add_node(Node::new("dst_c", "SinkC")).unwrap();
1289
1290        graph.connect(src_id, dst_a_id, "msg::A").unwrap();
1291        graph.connect(src_id, dst_b_id, "msg::B").unwrap();
1292        graph.connect(src_id, dst_a2_id, "msg::A").unwrap();
1293        graph.connect(src_id, dst_c_id, "msg::C").unwrap();
1294
1295        let runtime = compute_runtime_plan(graph).unwrap();
1296        let src_step = runtime
1297            .steps
1298            .iter()
1299            .find_map(|step| match step {
1300                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1301                _ => None,
1302            })
1303            .unwrap();
1304
1305        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1306        assert_eq!(output_pack.msg_types, vec!["msg::A", "msg::B", "msg::C"]);
1307
1308        let dst_a_step = runtime
1309            .steps
1310            .iter()
1311            .find_map(|step| match step {
1312                CuExecutionUnit::Step(step) if step.node_id == dst_a_id => Some(step),
1313                _ => None,
1314            })
1315            .unwrap();
1316        let dst_b_step = runtime
1317            .steps
1318            .iter()
1319            .find_map(|step| match step {
1320                CuExecutionUnit::Step(step) if step.node_id == dst_b_id => Some(step),
1321                _ => None,
1322            })
1323            .unwrap();
1324        let dst_a2_step = runtime
1325            .steps
1326            .iter()
1327            .find_map(|step| match step {
1328                CuExecutionUnit::Step(step) if step.node_id == dst_a2_id => Some(step),
1329                _ => None,
1330            })
1331            .unwrap();
1332        let dst_c_step = runtime
1333            .steps
1334            .iter()
1335            .find_map(|step| match step {
1336                CuExecutionUnit::Step(step) if step.node_id == dst_c_id => Some(step),
1337                _ => None,
1338            })
1339            .unwrap();
1340
1341        assert_eq!(dst_a_step.input_msg_indices_types[0].src_port, 0);
1342        assert_eq!(dst_b_step.input_msg_indices_types[0].src_port, 1);
1343        assert_eq!(dst_a2_step.input_msg_indices_types[0].src_port, 0);
1344        assert_eq!(dst_c_step.input_msg_indices_types[0].src_port, 2);
1345    }
1346
1347    #[test]
1348    fn test_runtime_output_ports_fanout_single() {
1349        let mut config = CuConfig::default();
1350        let graph = config.get_graph_mut(None).unwrap();
1351        let src_id = graph.add_node(Node::new("src", "Source")).unwrap();
1352        let dst_a_id = graph.add_node(Node::new("dst_a", "SinkA")).unwrap();
1353        let dst_b_id = graph.add_node(Node::new("dst_b", "SinkB")).unwrap();
1354
1355        graph.connect(src_id, dst_a_id, "i32").unwrap();
1356        graph.connect(src_id, dst_b_id, "i32").unwrap();
1357
1358        let runtime = compute_runtime_plan(graph).unwrap();
1359        let src_step = runtime
1360            .steps
1361            .iter()
1362            .find_map(|step| match step {
1363                CuExecutionUnit::Step(step) if step.node_id == src_id => Some(step),
1364                _ => None,
1365            })
1366            .unwrap();
1367
1368        let output_pack = src_step.output_msg_pack.as_ref().unwrap();
1369        assert_eq!(output_pack.msg_types, vec!["i32"]);
1370    }
1371
1372    #[test]
1373    fn test_runtime_plan_diamond_case1() {
1374        // more complex topology that tripped the scheduler
1375        let mut config = CuConfig::default();
1376        let graph = config.get_graph_mut(None).unwrap();
1377        let cam0_id = graph
1378            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1379            .unwrap();
1380        let inf0_id = graph
1381            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1382            .unwrap();
1383        let broadcast_id = graph
1384            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1385            .unwrap();
1386
1387        // case 1 order
1388        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1389        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1390        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1391
1392        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1393        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1394
1395        assert_eq!(edge_cam0_to_inf0, 0);
1396        assert_eq!(edge_cam0_to_broadcast, 1);
1397
1398        let runtime = compute_runtime_plan(graph).unwrap();
1399        let broadcast_step = runtime
1400            .steps
1401            .iter()
1402            .find_map(|step| match step {
1403                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1404                _ => None,
1405            })
1406            .unwrap();
1407
1408        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
1409        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
1410    }
1411
1412    #[test]
1413    fn test_runtime_plan_diamond_case2() {
1414        // more complex topology that tripped the scheduler variation 2
1415        let mut config = CuConfig::default();
1416        let graph = config.get_graph_mut(None).unwrap();
1417        let cam0_id = graph
1418            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1419            .unwrap();
1420        let inf0_id = graph
1421            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1422            .unwrap();
1423        let broadcast_id = graph
1424            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1425            .unwrap();
1426
1427        // case 2 order
1428        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1429        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1430        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1431
1432        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1433        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1434
1435        assert_eq!(edge_cam0_to_broadcast, 0);
1436        assert_eq!(edge_cam0_to_inf0, 1);
1437
1438        let runtime = compute_runtime_plan(graph).unwrap();
1439        let broadcast_step = runtime
1440            .steps
1441            .iter()
1442            .find_map(|step| match step {
1443                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1444                _ => None,
1445            })
1446            .unwrap();
1447
1448        assert_eq!(broadcast_step.input_msg_indices_types[0].msg_type, "i32");
1449        assert_eq!(broadcast_step.input_msg_indices_types[1].msg_type, "f32");
1450    }
1451}