cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{CuMonitor, build_monitor_topology};
10use crate::resource::ResourceManager;
11use cu29_clock::{ClockProvider, CuTime, RobotClock};
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15
16#[cfg(target_os = "none")]
17#[allow(unused_imports)]
18use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
19#[cfg(target_os = "none")]
20#[allow(unused_imports)]
21use cu29_log_derive::info;
22#[cfg(target_os = "none")]
23#[allow(unused_imports)]
24use cu29_log_runtime::log;
25#[cfg(all(target_os = "none", debug_assertions))]
26#[allow(unused_imports)]
27use cu29_log_runtime::log_debug_mode;
28#[cfg(target_os = "none")]
29#[allow(unused_imports)]
30use cu29_value::to_value;
31
32use alloc::boxed::Box;
33use alloc::collections::VecDeque;
34use alloc::format;
35use alloc::string::{String, ToString};
36use alloc::vec::Vec;
37use bincode::enc::EncoderImpl;
38use bincode::enc::write::{SizeWriter, SliceWriter};
39use bincode::error::EncodeError;
40use bincode::{Decode, Encode};
41use core::fmt::Result as FmtResult;
42use core::fmt::{Debug, Formatter};
43use petgraph::prelude::*;
44use petgraph::visit::VisitMap;
45use petgraph::visit::Visitable;
46
47#[cfg(feature = "std")]
48use cu29_log_runtime::LoggerRuntime;
49#[cfg(feature = "std")]
50use cu29_unifiedlog::UnifiedLoggerWrite;
51#[cfg(feature = "std")]
52use std::sync::{Arc, Mutex};
53
54/// Just a simple struct to hold the various bits needed to run a Copper application.
55#[cfg(feature = "std")]
56pub struct CopperContext {
57    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
58    pub logger_runtime: LoggerRuntime,
59    pub clock: RobotClock,
60}
61
62/// Manages the lifecycle of the copper lists and logging.
63pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
64    pub inner: CuListsManager<P, NBCL>,
65    /// Logger for the copper lists (messages between tasks)
66    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
67}
68
69impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
70    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
71        let mut is_top = true;
72        let mut nb_done = 0;
73        for cl in self.inner.iter_mut() {
74            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
75                cl.change_state(CopperListState::DoneProcessing);
76            }
77            if is_top && cl.get_state() == CopperListState::DoneProcessing {
78                if let Some(logger) = &mut self.logger {
79                    cl.change_state(CopperListState::BeingSerialized);
80                    logger.log(cl)?;
81                }
82                cl.change_state(CopperListState::Free);
83                nb_done += 1;
84            } else {
85                is_top = false;
86            }
87        }
88        for _ in 0..nb_done {
89            let _ = self.inner.pop();
90        }
91        Ok(())
92    }
93
94    pub fn available_copper_lists(&self) -> usize {
95        NBCL - self.inner.len()
96    }
97}
98
99/// Manages the frozen tasks state and logging.
100pub struct KeyFramesManager {
101    /// Where the serialized tasks are stored following the wave of execution of a CL.
102    inner: KeyFrame,
103
104    /// Logger for the state of the tasks (frozen tasks)
105    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
106
107    /// Capture a keyframe only each...
108    keyframe_interval: u32,
109}
110
111impl KeyFramesManager {
112    fn is_keyframe(&self, culistid: u32) -> bool {
113        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
114    }
115
116    pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
117        if self.is_keyframe(culistid) {
118            self.inner.reset(culistid, clock.now());
119        }
120    }
121
122    pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
123        if self.is_keyframe(culistid) {
124            if self.inner.culistid != culistid {
125                panic!("Freezing task for a different culistid");
126            }
127            self.inner
128                .add_frozen_task(task)
129                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
130        } else {
131            Ok(0)
132        }
133    }
134
135    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
136        if self.is_keyframe(culistid) {
137            let logger = self.logger.as_mut().unwrap();
138            logger.log(&self.inner)
139        } else {
140            Ok(())
141        }
142    }
143}
144
145/// This is the main structure that will be injected as a member of the Application struct.
146/// CT is the tuple of all the tasks in order of execution.
147/// CL is the type of the copper list, representing the input/output messages for all the tasks.
148pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
149    /// The base clock the runtime will be using to record time.
150    pub clock: RobotClock, // TODO: remove public at some point
151
152    /// The tuple of all the tasks in order of execution.
153    pub tasks: CT,
154
155    /// Tuple of all instantiated bridges.
156    pub bridges: CB,
157
158    /// Resource registry kept alive for tasks borrowing shared handles.
159    pub resources: ResourceManager,
160
161    /// The runtime monitoring.
162    pub monitor: M,
163
164    /// The logger for the copper lists (messages between tasks)
165    pub copperlists_manager: CopperListsManager<P, NBCL>,
166
167    /// The logger for the state of the tasks (frozen tasks)
168    pub keyframes_manager: KeyFramesManager,
169
170    /// The runtime configuration controlling the behavior of the run loop
171    pub runtime_config: RuntimeConfig,
172}
173
174/// To be able to share the clock we make the runtime a clock provider.
175impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
176    ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
177{
178    fn get_clock(&self) -> RobotClock {
179        self.clock.clone()
180    }
181}
182
183/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
184/// It is a double encapsulation: this one recording the culistid and another even in
185/// bincode in the serialized_tasks.
186#[derive(Encode, Decode)]
187pub struct KeyFrame {
188    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
189    pub culistid: u32,
190    // This is the timestamp when the keyframe was created, using the robot clock.
191    pub timestamp: CuTime,
192    // This is the bincode representation of the tuple of all the tasks.
193    pub serialized_tasks: Vec<u8>,
194}
195
196impl KeyFrame {
197    fn new() -> Self {
198        KeyFrame {
199            culistid: 0,
200            timestamp: CuTime::default(),
201            serialized_tasks: Vec::new(),
202        }
203    }
204
205    /// This is to be able to avoid reallocations
206    fn reset(&mut self, culistid: u32, timestamp: CuTime) {
207        self.culistid = culistid;
208        self.timestamp = timestamp;
209        self.serialized_tasks.clear();
210    }
211
212    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
213    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
214        let cfg = bincode::config::standard();
215        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
216        BincodeAdapter(task).encode(&mut sizer)?;
217        let need = sizer.into_writer().bytes_written as usize;
218
219        let start = self.serialized_tasks.len();
220        self.serialized_tasks.resize(start + need, 0);
221        let mut enc =
222            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
223        BincodeAdapter(task).encode(&mut enc)?;
224        Ok(need)
225    }
226}
227
228impl<
229    CT,
230    CB,
231    P: CopperListTuple + CuListZeroedInit + Default + 'static,
232    M: CuMonitor,
233    const NBCL: usize,
234> CuRuntime<CT, CB, P, M, NBCL>
235{
236    // FIXME(gbin): this became REALLY ugly with no-std
237    #[allow(clippy::too_many_arguments)]
238    #[cfg(feature = "std")]
239    pub fn new(
240        clock: RobotClock,
241        config: &CuConfig,
242        mission: Option<&str>,
243        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
244        tasks_instanciator: impl for<'c> Fn(
245            Vec<Option<&'c ComponentConfig>>,
246            &mut ResourceManager,
247        ) -> CuResult<CT>,
248        monitor_instanciator: impl Fn(&CuConfig) -> M,
249        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
250        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
251        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
252    ) -> CuResult<Self> {
253        let resources = resources_instanciator(config)?;
254        Self::new_with_resources(
255            clock,
256            config,
257            mission,
258            resources,
259            tasks_instanciator,
260            monitor_instanciator,
261            bridges_instanciator,
262            copperlists_logger,
263            keyframes_logger,
264        )
265    }
266
267    #[allow(clippy::too_many_arguments)]
268    #[cfg(feature = "std")]
269    pub fn new_with_resources(
270        clock: RobotClock,
271        config: &CuConfig,
272        mission: Option<&str>,
273        mut resources: ResourceManager,
274        tasks_instanciator: impl for<'c> Fn(
275            Vec<Option<&'c ComponentConfig>>,
276            &mut ResourceManager,
277        ) -> CuResult<CT>,
278        monitor_instanciator: impl Fn(&CuConfig) -> M,
279        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
280        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
281        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
282    ) -> CuResult<Self> {
283        let graph = config.get_graph(mission)?;
284        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
285            .get_all_nodes()
286            .iter()
287            .map(|(_, node)| node.get_instance_config())
288            .collect();
289
290        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
291        let mut monitor = monitor_instanciator(config);
292        if let Ok(topology) = build_monitor_topology(config, mission) {
293            monitor.set_topology(topology);
294        }
295        let bridges = bridges_instanciator(config, &mut resources)?;
296
297        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
298            Some(logging_config) if logging_config.enable_task_logging => (
299                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
300                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
301                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
302            ),
303            Some(_) => (None, None, 0), // explicit no enable logging
304            None => (
305                // default
306                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
307                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
308                DEFAULT_KEYFRAME_INTERVAL,
309            ),
310        };
311
312        let copperlists_manager = CopperListsManager {
313            inner: CuListsManager::new(),
314            logger: copperlists_logger,
315        };
316        #[cfg(target_os = "none")]
317        {
318            let cl_size = core::mem::size_of::<CopperList<P>>();
319            let total_bytes = cl_size.saturating_mul(NBCL);
320            info!(
321                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
322                NBCL, cl_size, total_bytes
323            );
324        }
325
326        let keyframes_manager = KeyFramesManager {
327            inner: KeyFrame::new(),
328            logger: keyframes_logger,
329            keyframe_interval,
330        };
331
332        let runtime_config = config.runtime.clone().unwrap_or_default();
333
334        let runtime = Self {
335            tasks,
336            bridges,
337            resources,
338            monitor,
339            clock,
340            copperlists_manager,
341            keyframes_manager,
342            runtime_config,
343        };
344
345        Ok(runtime)
346    }
347
348    #[allow(clippy::too_many_arguments)]
349    #[cfg(not(feature = "std"))]
350    pub fn new(
351        clock: RobotClock,
352        config: &CuConfig,
353        mission: Option<&str>,
354        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
355        tasks_instanciator: impl for<'c> Fn(
356            Vec<Option<&'c ComponentConfig>>,
357            &mut ResourceManager,
358        ) -> CuResult<CT>,
359        monitor_instanciator: impl Fn(&CuConfig) -> M,
360        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
361        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
362        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
363    ) -> CuResult<Self> {
364        #[cfg(target_os = "none")]
365        info!("CuRuntime::new: resources instanciator");
366        let resources = resources_instanciator(config)?;
367        Self::new_with_resources(
368            clock,
369            config,
370            mission,
371            resources,
372            tasks_instanciator,
373            monitor_instanciator,
374            bridges_instanciator,
375            copperlists_logger,
376            keyframes_logger,
377        )
378    }
379
380    #[allow(clippy::too_many_arguments)]
381    #[cfg(not(feature = "std"))]
382    pub fn new_with_resources(
383        clock: RobotClock,
384        config: &CuConfig,
385        mission: Option<&str>,
386        mut resources: ResourceManager,
387        tasks_instanciator: impl for<'c> Fn(
388            Vec<Option<&'c ComponentConfig>>,
389            &mut ResourceManager,
390        ) -> CuResult<CT>,
391        monitor_instanciator: impl Fn(&CuConfig) -> M,
392        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
393        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
394        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
395    ) -> CuResult<Self> {
396        #[cfg(target_os = "none")]
397        info!("CuRuntime::new: get graph");
398        let graph = config.get_graph(mission)?;
399        #[cfg(target_os = "none")]
400        info!("CuRuntime::new: graph ok");
401        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
402            .get_all_nodes()
403            .iter()
404            .map(|(_, node)| node.get_instance_config())
405            .collect();
406
407        #[cfg(target_os = "none")]
408        info!("CuRuntime::new: tasks instanciator");
409        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
410
411        #[cfg(target_os = "none")]
412        info!("CuRuntime::new: monitor instanciator");
413        let mut monitor = monitor_instanciator(config);
414        #[cfg(target_os = "none")]
415        info!("CuRuntime::new: monitor instanciator ok");
416        #[cfg(target_os = "none")]
417        info!("CuRuntime::new: build monitor topology");
418        if let Ok(topology) = build_monitor_topology(config, mission) {
419            #[cfg(target_os = "none")]
420            info!("CuRuntime::new: monitor topology ok");
421            monitor.set_topology(topology);
422            #[cfg(target_os = "none")]
423            info!("CuRuntime::new: monitor topology set");
424        }
425        #[cfg(target_os = "none")]
426        info!("CuRuntime::new: bridges instanciator");
427        let bridges = bridges_instanciator(config, &mut resources)?;
428
429        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
430            Some(logging_config) if logging_config.enable_task_logging => (
431                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
432                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
433                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
434            ),
435            Some(_) => (None, None, 0), // explicit no enable logging
436            None => (
437                // default
438                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
439                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
440                DEFAULT_KEYFRAME_INTERVAL,
441            ),
442        };
443
444        let copperlists_manager = CopperListsManager {
445            inner: CuListsManager::new(),
446            logger: copperlists_logger,
447        };
448        #[cfg(target_os = "none")]
449        {
450            let cl_size = core::mem::size_of::<CopperList<P>>();
451            let total_bytes = cl_size.saturating_mul(NBCL);
452            info!(
453                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
454                NBCL, cl_size, total_bytes
455            );
456        }
457
458        let keyframes_manager = KeyFramesManager {
459            inner: KeyFrame::new(),
460            logger: keyframes_logger,
461            keyframe_interval,
462        };
463
464        let runtime_config = config.runtime.clone().unwrap_or_default();
465
466        let runtime = Self {
467            tasks,
468            bridges,
469            resources,
470            monitor,
471            clock,
472            copperlists_manager,
473            keyframes_manager,
474            runtime_config,
475        };
476
477        Ok(runtime)
478    }
479}
480
481/// Copper tasks can be of 3 types:
482/// - Source: only producing output messages (usually used for drivers)
483/// - Regular: processing input messages and producing output messages, more like compute nodes.
484/// - Sink: only consuming input messages (usually used for actuators)
485#[derive(Debug, PartialEq, Eq, Clone, Copy)]
486pub enum CuTaskType {
487    Source,
488    Regular,
489    Sink,
490}
491
492/// This structure represents a step in the execution plan.
493pub struct CuExecutionStep {
494    /// NodeId: node id of the task to execute
495    pub node_id: NodeId,
496    /// Node: node instance
497    pub node: Node,
498    /// CuTaskType: type of the task
499    pub task_type: CuTaskType,
500
501    /// the indices in the copper list of the input messages and their types
502    pub input_msg_indices_types: Vec<(u32, String)>,
503
504    /// the index in the copper list of the output message and its type
505    pub output_msg_index_type: Option<(u32, String)>,
506}
507
508impl Debug for CuExecutionStep {
509    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
510        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
511        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
512        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
513        f.write_str(
514            format!(
515                "              input_msg_types: {:?}\n",
516                self.input_msg_indices_types
517            )
518            .as_str(),
519        )?;
520        f.write_str(
521            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
522        )?;
523        Ok(())
524    }
525}
526
527/// This structure represents a loop in the execution plan.
528/// It is used to represent a sequence of Execution units (loop or steps) that are executed
529/// multiple times.
530/// if loop_count is None, the loop is infinite.
531pub struct CuExecutionLoop {
532    pub steps: Vec<CuExecutionUnit>,
533    pub loop_count: Option<u32>,
534}
535
536impl Debug for CuExecutionLoop {
537    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
538        f.write_str("CuExecutionLoop:\n")?;
539        for step in &self.steps {
540            match step {
541                CuExecutionUnit::Step(step) => {
542                    step.fmt(f)?;
543                }
544                CuExecutionUnit::Loop(l) => {
545                    l.fmt(f)?;
546                }
547            }
548        }
549
550        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
551        Ok(())
552    }
553}
554
555/// This structure represents a step in the execution plan.
556#[derive(Debug)]
557pub enum CuExecutionUnit {
558    Step(CuExecutionStep),
559    Loop(CuExecutionLoop),
560}
561
562fn find_output_index_type_from_nodeid(
563    node_id: NodeId,
564    steps: &Vec<CuExecutionUnit>,
565) -> Option<(u32, String)> {
566    for step in steps {
567        match step {
568            CuExecutionUnit::Loop(loop_unit) => {
569                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
570                    return Some(index);
571                }
572            }
573            CuExecutionUnit::Step(step) => {
574                if step.node_id == node_id {
575                    return step.output_msg_index_type.clone();
576                }
577            }
578        }
579    }
580    None
581}
582
583pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
584    if graph.incoming_neighbor_count(node_id) == 0 {
585        CuTaskType::Source
586    } else if graph.outgoing_neighbor_count(node_id) == 0 {
587        CuTaskType::Sink
588    } else {
589        CuTaskType::Regular
590    }
591}
592
593/// This function gets the input node by using the input step plan id, to get the edge that
594/// connects the input to the output in the config graph
595fn find_edge_with_plan_input_id(
596    plan: &[CuExecutionUnit],
597    graph: &CuGraph,
598    plan_id: u32,
599    output_node_id: NodeId,
600) -> usize {
601    let input_node = plan
602        .get(plan_id as usize)
603        .expect("Input step should've been added to plan before the step that receives the input");
604    let CuExecutionUnit::Step(input_step) = input_node else {
605        panic!("Expected input to be from a step, not a loop");
606    };
607    let input_node_id = input_step.node_id;
608
609    graph
610        .0
611        .edges_connecting(input_node_id.into(), output_node_id.into())
612        .map(|edge| edge.id().index())
613        .next()
614        .expect("An edge connecting the input to the output should exist")
615}
616
617/// The connection id used here is the index of the config graph edge that equates to the wanted
618/// connection
619fn sort_inputs_by_cnx_id(
620    input_msg_indices_types: &mut [(u32, String)],
621    plan: &[CuExecutionUnit],
622    graph: &CuGraph,
623    curr_node_id: NodeId,
624) {
625    input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
626        let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
627        let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
628        a_edge_id.cmp(&b_edge_id)
629    });
630}
631/// Explores a subbranch and build the partial plan out of it.
632fn plan_tasks_tree_branch(
633    graph: &CuGraph,
634    mut next_culist_output_index: u32,
635    starting_point: NodeId,
636    plan: &mut Vec<CuExecutionUnit>,
637) -> (u32, bool) {
638    #[cfg(all(feature = "std", feature = "macro_debug"))]
639    eprintln!("-- starting branch from node {starting_point}");
640
641    let mut visitor = Bfs::new(&graph.0, starting_point.into());
642    let mut handled = false;
643
644    while let Some(node) = visitor.next(&graph.0) {
645        let id = node.index() as NodeId;
646        let node_ref = graph.get_node(id).unwrap();
647        #[cfg(all(feature = "std", feature = "macro_debug"))]
648        eprintln!("  Visiting node: {node_ref:?}");
649
650        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
651        let output_msg_index_type: Option<(u32, String)>;
652        let task_type = find_task_type_for_id(graph, id);
653
654        match task_type {
655            CuTaskType::Source => {
656                #[cfg(all(feature = "std", feature = "macro_debug"))]
657                eprintln!("    → Source node, assign output index {next_culist_output_index}");
658                output_msg_index_type = Some((
659                    next_culist_output_index,
660                    graph
661                        .0
662                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
663                        .unwrap() // FIXME(gbin): Error handling
664                        .msg
665                        .clone(),
666                ));
667                next_culist_output_index += 1;
668            }
669            CuTaskType::Sink => {
670                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
671                #[cfg(all(feature = "std", feature = "macro_debug"))]
672                eprintln!("    → Sink with parents: {parents:?}");
673                for parent in parents {
674                    let pid = parent;
675                    let index_type = find_output_index_type_from_nodeid(pid, plan);
676                    if let Some(index_type) = index_type {
677                        #[cfg(all(feature = "std", feature = "macro_debug"))]
678                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
679                        input_msg_indices_types.push(index_type);
680                    } else {
681                        #[cfg(all(feature = "std", feature = "macro_debug"))]
682                        eprintln!("      ✗ Input from {pid} not ready, returning");
683                        return (next_culist_output_index, handled);
684                    }
685                }
686                output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
687                next_culist_output_index += 1;
688            }
689            CuTaskType::Regular => {
690                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
691                #[cfg(all(feature = "std", feature = "macro_debug"))]
692                eprintln!("    → Regular task with parents: {parents:?}");
693                for parent in parents {
694                    let pid = parent;
695                    let index_type = find_output_index_type_from_nodeid(pid, plan);
696                    if let Some(index_type) = index_type {
697                        #[cfg(all(feature = "std", feature = "macro_debug"))]
698                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
699                        input_msg_indices_types.push(index_type);
700                    } else {
701                        #[cfg(all(feature = "std", feature = "macro_debug"))]
702                        eprintln!("      ✗ Input from {pid} not ready, returning");
703                        return (next_culist_output_index, handled);
704                    }
705                }
706                output_msg_index_type = Some((
707                    next_culist_output_index,
708                    graph
709                        .0
710                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) // FIXME(gbin): Error handling and multimission
711                        .unwrap()
712                        .msg
713                        .clone(),
714                ));
715                next_culist_output_index += 1;
716            }
717        }
718
719        sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
720
721        if let Some(pos) = plan
722            .iter()
723            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
724        {
725            #[cfg(all(feature = "std", feature = "macro_debug"))]
726            eprintln!("    → Already in plan, modifying existing step");
727            let mut step = plan.remove(pos);
728            if let CuExecutionUnit::Step(ref mut s) = step {
729                s.input_msg_indices_types = input_msg_indices_types;
730            }
731            plan.push(step);
732        } else {
733            #[cfg(all(feature = "std", feature = "macro_debug"))]
734            eprintln!("    → New step added to plan");
735            let step = CuExecutionStep {
736                node_id: id,
737                node: node_ref.clone(),
738                task_type,
739                input_msg_indices_types,
740                output_msg_index_type,
741            };
742            plan.push(CuExecutionUnit::Step(step));
743        }
744
745        handled = true;
746    }
747
748    #[cfg(all(feature = "std", feature = "macro_debug"))]
749    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
750    (next_culist_output_index, handled)
751}
752
753/// This is the main heuristics to compute an execution plan at compilation time.
754/// TODO(gbin): Make that heuristic pluggable.
755pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
756    #[cfg(all(feature = "std", feature = "macro_debug"))]
757    eprintln!("[runtime plan]");
758    let visited = graph.0.visit_map();
759    let mut plan = Vec::new();
760    let mut next_culist_output_index = 0u32;
761
762    let mut queue: VecDeque<NodeId> = graph
763        .node_indices()
764        .iter()
765        .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
766        .map(|node| node.index() as NodeId)
767        .collect();
768
769    #[cfg(all(feature = "std", feature = "macro_debug"))]
770    eprintln!("Initial source nodes: {queue:?}");
771
772    while let Some(start_node) = queue.pop_front() {
773        if visited.is_visited(&start_node) {
774            #[cfg(all(feature = "std", feature = "macro_debug"))]
775            eprintln!("→ Skipping already visited source {start_node}");
776            continue;
777        }
778
779        #[cfg(all(feature = "std", feature = "macro_debug"))]
780        eprintln!("→ Starting BFS from source {start_node}");
781        let mut bfs = Bfs::new(&graph.0, start_node.into());
782
783        while let Some(node_index) = bfs.next(&graph.0) {
784            let node_id = node_index.index() as NodeId;
785            let already_in_plan = plan
786                .iter()
787                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
788            if already_in_plan {
789                #[cfg(all(feature = "std", feature = "macro_debug"))]
790                eprintln!("    → Node {node_id} already planned, skipping");
791                continue;
792            }
793
794            #[cfg(all(feature = "std", feature = "macro_debug"))]
795            eprintln!("    Planning from node {node_id}");
796            let (new_index, handled) =
797                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
798            next_culist_output_index = new_index;
799
800            if !handled {
801                #[cfg(all(feature = "std", feature = "macro_debug"))]
802                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
803                continue;
804            }
805
806            #[cfg(all(feature = "std", feature = "macro_debug"))]
807            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
808            for neighbor in graph.0.neighbors(node_index) {
809                if !visited.is_visited(&neighbor) {
810                    let nid = neighbor.index() as NodeId;
811                    #[cfg(all(feature = "std", feature = "macro_debug"))]
812                    eprintln!("      → Enqueueing neighbor {nid}");
813                    queue.push_back(nid);
814                }
815            }
816        }
817    }
818
819    Ok(CuExecutionLoop {
820        steps: plan,
821        loop_count: None,
822    })
823}
824
825//tests
826#[cfg(test)]
827mod tests {
828    use super::*;
829    use crate::config::Node;
830    use crate::cutask::CuSinkTask;
831    use crate::cutask::{CuSrcTask, Freezable};
832    use crate::monitoring::NoMonitor;
833    use bincode::Encode;
834    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
835    use serde_derive::Serialize;
836
837    pub struct TestSource {}
838
839    impl Freezable for TestSource {}
840
841    impl CuSrcTask for TestSource {
842        type Resources<'r> = ();
843        type Output<'m> = ();
844        fn new_with(
845            _config: Option<&ComponentConfig>,
846            _resources: Self::Resources<'_>,
847        ) -> CuResult<Self>
848        where
849            Self: Sized,
850        {
851            Ok(Self {})
852        }
853
854        fn process(
855            &mut self,
856            _clock: &RobotClock,
857            _empty_msg: &mut Self::Output<'_>,
858        ) -> CuResult<()> {
859            Ok(())
860        }
861    }
862
863    pub struct TestSink {}
864
865    impl Freezable for TestSink {}
866
867    impl CuSinkTask for TestSink {
868        type Resources<'r> = ();
869        type Input<'m> = ();
870
871        fn new_with(
872            _config: Option<&ComponentConfig>,
873            _resources: Self::Resources<'_>,
874        ) -> CuResult<Self>
875        where
876            Self: Sized,
877        {
878            Ok(Self {})
879        }
880
881        fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
882            Ok(())
883        }
884    }
885
886    // Those should be generated by the derive macro
887    type Tasks = (TestSource, TestSink);
888
889    #[derive(Debug, Encode, Decode, Serialize, Default)]
890    struct Msgs(());
891
892    impl ErasedCuStampedDataSet for Msgs {
893        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
894            Vec::new()
895        }
896    }
897
898    impl MatchingTasks for Msgs {
899        fn get_all_task_ids() -> &'static [&'static str] {
900            &[]
901        }
902    }
903
904    impl CuListZeroedInit for Msgs {
905        fn init_zeroed(&mut self) {}
906    }
907
908    #[cfg(feature = "std")]
909    fn tasks_instanciator(
910        all_instances_configs: Vec<Option<&ComponentConfig>>,
911        _resources: &mut ResourceManager,
912    ) -> CuResult<Tasks> {
913        Ok((
914            TestSource::new(all_instances_configs[0])?,
915            TestSink::new(all_instances_configs[1])?,
916        ))
917    }
918
919    #[cfg(not(feature = "std"))]
920    fn tasks_instanciator(
921        all_instances_configs: Vec<Option<&ComponentConfig>>,
922        _resources: &mut ResourceManager,
923    ) -> CuResult<Tasks> {
924        Ok((
925            TestSource::new(all_instances_configs[0])?,
926            TestSink::new(all_instances_configs[1])?,
927        ))
928    }
929
930    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
931        NoMonitor {}
932    }
933
934    fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
935        Ok(())
936    }
937
938    fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
939        Ok(ResourceManager::new(&[]))
940    }
941
942    #[derive(Debug)]
943    struct FakeWriter {}
944
945    impl<E: Encode> WriteStream<E> for FakeWriter {
946        fn log(&mut self, _obj: &E) -> CuResult<()> {
947            Ok(())
948        }
949    }
950
951    #[test]
952    fn test_runtime_instantiation() {
953        let mut config = CuConfig::default();
954        let graph = config.get_graph_mut(None).unwrap();
955        graph.add_node(Node::new("a", "TestSource")).unwrap();
956        graph.add_node(Node::new("b", "TestSink")).unwrap();
957        graph.connect(0, 1, "()").unwrap();
958        let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
959            RobotClock::default(),
960            &config,
961            None,
962            resources_instanciator,
963            tasks_instanciator,
964            monitor_instanciator,
965            bridges_instanciator,
966            FakeWriter {},
967            FakeWriter {},
968        );
969        assert!(runtime.is_ok());
970    }
971
972    #[test]
973    fn test_copperlists_manager_lifecycle() {
974        let mut config = CuConfig::default();
975        let graph = config.get_graph_mut(None).unwrap();
976        graph.add_node(Node::new("a", "TestSource")).unwrap();
977        graph.add_node(Node::new("b", "TestSink")).unwrap();
978        graph.connect(0, 1, "()").unwrap();
979
980        let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
981            RobotClock::default(),
982            &config,
983            None,
984            resources_instanciator,
985            tasks_instanciator,
986            monitor_instanciator,
987            bridges_instanciator,
988            FakeWriter {},
989            FakeWriter {},
990        )
991        .unwrap();
992
993        // Now emulates the generated runtime
994        {
995            let copperlists = &mut runtime.copperlists_manager;
996            let culist0 = copperlists
997                .inner
998                .create()
999                .expect("Ran out of space for copper lists");
1000            // FIXME: error handling.
1001            let id = culist0.id;
1002            assert_eq!(id, 0);
1003            culist0.change_state(CopperListState::Processing);
1004            assert_eq!(copperlists.available_copper_lists(), 1);
1005        }
1006
1007        {
1008            let copperlists = &mut runtime.copperlists_manager;
1009            let culist1 = copperlists
1010                .inner
1011                .create()
1012                .expect("Ran out of space for copper lists"); // FIXME: error handling.
1013            let id = culist1.id;
1014            assert_eq!(id, 1);
1015            culist1.change_state(CopperListState::Processing);
1016            assert_eq!(copperlists.available_copper_lists(), 0);
1017        }
1018
1019        {
1020            let copperlists = &mut runtime.copperlists_manager;
1021            let culist2 = copperlists.inner.create();
1022            assert!(culist2.is_none());
1023            assert_eq!(copperlists.available_copper_lists(), 0);
1024            // Free in order, should let the top of the stack be serialized and freed.
1025            let _ = copperlists.end_of_processing(1);
1026            assert_eq!(copperlists.available_copper_lists(), 1);
1027        }
1028
1029        // Readd a CL
1030        {
1031            let copperlists = &mut runtime.copperlists_manager;
1032            let culist2 = copperlists
1033                .inner
1034                .create()
1035                .expect("Ran out of space for copper lists"); // FIXME: error handling.
1036            let id = culist2.id;
1037            assert_eq!(id, 2);
1038            culist2.change_state(CopperListState::Processing);
1039            assert_eq!(copperlists.available_copper_lists(), 0);
1040            // Free out of order, the #0 first
1041            let _ = copperlists.end_of_processing(0);
1042            // Should not free up the top of the stack
1043            assert_eq!(copperlists.available_copper_lists(), 0);
1044
1045            // Free up the top of the stack
1046            let _ = copperlists.end_of_processing(2);
1047            // This should free up 2 CLs
1048
1049            assert_eq!(copperlists.available_copper_lists(), 2);
1050        }
1051    }
1052
1053    #[test]
1054    fn test_runtime_task_input_order() {
1055        let mut config = CuConfig::default();
1056        let graph = config.get_graph_mut(None).unwrap();
1057        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
1058        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
1059        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
1060
1061        assert_eq!(src1_id, 0);
1062        assert_eq!(src2_id, 1);
1063
1064        // note that the source2 connection is before the source1
1065        let src1_type = "src1_type";
1066        let src2_type = "src2_type";
1067        graph.connect(src2_id, sink_id, src2_type).unwrap();
1068        graph.connect(src1_id, sink_id, src1_type).unwrap();
1069
1070        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
1071        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
1072        // the edge id depends on the order the connection is created, not
1073        // on the node id, and that is what determines the input order
1074        assert_eq!(src1_edge_id, 1);
1075        assert_eq!(src2_edge_id, 0);
1076
1077        let runtime = compute_runtime_plan(graph).unwrap();
1078        let sink_step = runtime
1079            .steps
1080            .iter()
1081            .find_map(|step| match step {
1082                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1083                _ => None,
1084            })
1085            .unwrap();
1086
1087        // since the src2 connection was added before src1 connection, the src2 type should be
1088        // first
1089        assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
1090        assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
1091    }
1092
1093    #[test]
1094    fn test_runtime_plan_diamond_case1() {
1095        // more complex topology that tripped the scheduler
1096        let mut config = CuConfig::default();
1097        let graph = config.get_graph_mut(None).unwrap();
1098        let cam0_id = graph
1099            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1100            .unwrap();
1101        let inf0_id = graph
1102            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1103            .unwrap();
1104        let broadcast_id = graph
1105            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1106            .unwrap();
1107
1108        // case 1 order
1109        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1110        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1111        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1112
1113        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1114        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1115
1116        assert_eq!(edge_cam0_to_inf0, 0);
1117        assert_eq!(edge_cam0_to_broadcast, 1);
1118
1119        let runtime = compute_runtime_plan(graph).unwrap();
1120        let broadcast_step = runtime
1121            .steps
1122            .iter()
1123            .find_map(|step| match step {
1124                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1125                _ => None,
1126            })
1127            .unwrap();
1128
1129        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1130        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1131    }
1132
1133    #[test]
1134    fn test_runtime_plan_diamond_case2() {
1135        // more complex topology that tripped the scheduler variation 2
1136        let mut config = CuConfig::default();
1137        let graph = config.get_graph_mut(None).unwrap();
1138        let cam0_id = graph
1139            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1140            .unwrap();
1141        let inf0_id = graph
1142            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1143            .unwrap();
1144        let broadcast_id = graph
1145            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1146            .unwrap();
1147
1148        // case 2 order
1149        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1150        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1151        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1152
1153        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1154        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1155
1156        assert_eq!(edge_cam0_to_broadcast, 0);
1157        assert_eq!(edge_cam0_to_inf0, 1);
1158
1159        let runtime = compute_runtime_plan(graph).unwrap();
1160        let broadcast_step = runtime
1161            .steps
1162            .iter()
1163            .find_map(|step| match step {
1164                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1165                _ => None,
1166            })
1167            .unwrap();
1168
1169        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1170        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1171    }
1172}