cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, CuDirection, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{build_monitor_topology, CuMonitor};
10use crate::resource::ResourceManager;
11use cu29_clock::{ClockProvider, CuTime, RobotClock};
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15
16use bincode::enc::write::{SizeWriter, SliceWriter};
17use bincode::enc::EncoderImpl;
18use bincode::error::EncodeError;
19use bincode::{Decode, Encode};
20use core::fmt::{Debug, Formatter};
21use petgraph::prelude::*;
22use petgraph::visit::VisitMap;
23use petgraph::visit::Visitable;
24
25#[cfg(not(feature = "std"))]
26mod imp {
27    pub use alloc::boxed::Box;
28    pub use alloc::collections::VecDeque;
29    pub use alloc::format;
30    pub use alloc::string::String;
31    pub use alloc::string::ToString;
32    pub use alloc::vec::Vec;
33    pub use core::fmt::Result as FmtResult;
34}
35
36#[cfg(feature = "std")]
37mod imp {
38    pub use cu29_log_runtime::LoggerRuntime;
39    pub use cu29_unifiedlog::UnifiedLoggerWrite;
40    pub use rayon::ThreadPool;
41    pub use std::collections::VecDeque;
42    pub use std::fmt::Result as FmtResult;
43    pub use std::sync::{Arc, Mutex};
44}
45
46use imp::*;
47
48/// Just a simple struct to hold the various bits needed to run a Copper application.
49#[cfg(feature = "std")]
50pub struct CopperContext {
51    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
52    pub logger_runtime: LoggerRuntime,
53    pub clock: RobotClock,
54}
55
56/// Manages the lifecycle of the copper lists and logging.
57pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
58    pub inner: CuListsManager<P, NBCL>,
59    /// Logger for the copper lists (messages between tasks)
60    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
61}
62
63impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
64    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
65        let mut is_top = true;
66        let mut nb_done = 0;
67        for cl in self.inner.iter_mut() {
68            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
69                cl.change_state(CopperListState::DoneProcessing);
70            }
71            if is_top && cl.get_state() == CopperListState::DoneProcessing {
72                if let Some(logger) = &mut self.logger {
73                    cl.change_state(CopperListState::BeingSerialized);
74                    logger.log(cl)?;
75                }
76                cl.change_state(CopperListState::Free);
77                nb_done += 1;
78            } else {
79                is_top = false;
80            }
81        }
82        for _ in 0..nb_done {
83            let _ = self.inner.pop();
84        }
85        Ok(())
86    }
87
88    pub fn available_copper_lists(&self) -> usize {
89        NBCL - self.inner.len()
90    }
91}
92
93/// Manages the frozen tasks state and logging.
94pub struct KeyFramesManager {
95    /// Where the serialized tasks are stored following the wave of execution of a CL.
96    inner: KeyFrame,
97
98    /// Logger for the state of the tasks (frozen tasks)
99    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
100
101    /// Capture a keyframe only each...
102    keyframe_interval: u32,
103}
104
105impl KeyFramesManager {
106    fn is_keyframe(&self, culistid: u32) -> bool {
107        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
108    }
109
110    pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
111        if self.is_keyframe(culistid) {
112            self.inner.reset(culistid, clock.now());
113        }
114    }
115
116    pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
117        if self.is_keyframe(culistid) {
118            if self.inner.culistid != culistid {
119                panic!("Freezing task for a different culistid");
120            }
121            self.inner
122                .add_frozen_task(task)
123                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
124        } else {
125            Ok(0)
126        }
127    }
128
129    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
130        if self.is_keyframe(culistid) {
131            let logger = self.logger.as_mut().unwrap();
132            logger.log(&self.inner)
133        } else {
134            Ok(())
135        }
136    }
137}
138
139/// This is the main structure that will be injected as a member of the Application struct.
140/// CT is the tuple of all the tasks in order of execution.
141/// CL is the type of the copper list, representing the input/output messages for all the tasks.
142pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
143    /// The base clock the runtime will be using to record time.
144    pub clock: RobotClock, // TODO: remove public at some point
145
146    /// The tuple of all the tasks in order of execution.
147    pub tasks: CT,
148
149    /// Tuple of all instantiated bridges.
150    pub bridges: CB,
151
152    /// Resource registry kept alive for tasks borrowing shared handles.
153    pub resources: ResourceManager,
154
155    /// For backgrounded tasks.
156    #[cfg(feature = "std")]
157    pub threadpool: Arc<ThreadPool>,
158
159    /// The runtime monitoring.
160    pub monitor: M,
161
162    /// The logger for the copper lists (messages between tasks)
163    pub copperlists_manager: CopperListsManager<P, NBCL>,
164
165    /// The logger for the state of the tasks (frozen tasks)
166    pub keyframes_manager: KeyFramesManager,
167
168    /// The runtime configuration controlling the behavior of the run loop
169    pub runtime_config: RuntimeConfig,
170}
171
172/// To be able to share the clock we make the runtime a clock provider.
173impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
174    ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
175{
176    fn get_clock(&self) -> RobotClock {
177        self.clock.clone()
178    }
179}
180
181/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
182/// It is a double encapsulation: this one recording the culistid and another even in
183/// bincode in the serialized_tasks.
184#[derive(Encode, Decode)]
185pub struct KeyFrame {
186    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
187    pub culistid: u32,
188    // This is the timestamp when the keyframe was created, using the robot clock.
189    pub timestamp: CuTime,
190    // This is the bincode representation of the tuple of all the tasks.
191    pub serialized_tasks: Vec<u8>,
192}
193
194impl KeyFrame {
195    fn new() -> Self {
196        KeyFrame {
197            culistid: 0,
198            timestamp: CuTime::default(),
199            serialized_tasks: Vec::new(),
200        }
201    }
202
203    /// This is to be able to avoid reallocations
204    fn reset(&mut self, culistid: u32, timestamp: CuTime) {
205        self.culistid = culistid;
206        self.timestamp = timestamp;
207        self.serialized_tasks.clear();
208    }
209
210    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
211    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
212        let cfg = bincode::config::standard();
213        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
214        BincodeAdapter(task).encode(&mut sizer)?;
215        let need = sizer.into_writer().bytes_written as usize;
216
217        let start = self.serialized_tasks.len();
218        self.serialized_tasks.resize(start + need, 0);
219        let mut enc =
220            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
221        BincodeAdapter(task).encode(&mut enc)?;
222        Ok(need)
223    }
224}
225
226impl<
227        CT,
228        CB,
229        P: CopperListTuple + CuListZeroedInit + Default + 'static,
230        M: CuMonitor,
231        const NBCL: usize,
232    > CuRuntime<CT, CB, P, M, NBCL>
233{
234    // FIXME(gbin): this became REALLY ugly with no-std
235    #[allow(clippy::too_many_arguments)]
236    #[cfg(feature = "std")]
237    pub fn new(
238        clock: RobotClock,
239        config: &CuConfig,
240        mission: Option<&str>,
241        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
242        tasks_instanciator: impl for<'c> Fn(
243            Vec<Option<&'c ComponentConfig>>,
244            &mut ResourceManager,
245            Arc<ThreadPool>,
246        ) -> CuResult<CT>,
247        monitor_instanciator: impl Fn(&CuConfig) -> M,
248        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
249        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
250        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
251    ) -> CuResult<Self> {
252        let graph = config.get_graph(mission)?;
253        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
254            .get_all_nodes()
255            .iter()
256            .map(|(_, node)| node.get_instance_config())
257            .collect();
258
259        // TODO: make that configurable
260
261        let threadpool = Arc::new(
262            rayon::ThreadPoolBuilder::new()
263                .num_threads(2) // default to 4 threads if not specified
264                .build()
265                .expect("Could not create the threadpool"),
266        );
267
268        let mut resources = resources_instanciator(config)?;
269
270        let tasks = tasks_instanciator(all_instances_configs, &mut resources, threadpool.clone())?;
271        let mut monitor = monitor_instanciator(config);
272        if let Ok(topology) = build_monitor_topology(config, mission) {
273            monitor.set_topology(topology);
274        }
275        let bridges = bridges_instanciator(config, &mut resources)?;
276
277        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
278            Some(logging_config) if logging_config.enable_task_logging => (
279                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
280                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
281                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
282            ),
283            Some(_) => (None, None, 0), // explicit no enable logging
284            None => (
285                // default
286                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
287                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
288                DEFAULT_KEYFRAME_INTERVAL,
289            ),
290        };
291
292        let copperlists_manager = CopperListsManager {
293            inner: CuListsManager::new(),
294            logger: copperlists_logger,
295        };
296
297        let keyframes_manager = KeyFramesManager {
298            inner: KeyFrame::new(),
299            logger: keyframes_logger,
300            keyframe_interval,
301        };
302
303        let runtime_config = config.runtime.clone().unwrap_or_default();
304
305        let runtime = Self {
306            tasks,
307            bridges,
308            resources,
309            threadpool,
310            monitor,
311            clock,
312            copperlists_manager,
313            keyframes_manager,
314            runtime_config,
315        };
316
317        Ok(runtime)
318    }
319
320    #[allow(clippy::too_many_arguments)]
321    #[cfg(not(feature = "std"))]
322    pub fn new(
323        clock: RobotClock,
324        config: &CuConfig,
325        mission: Option<&str>,
326        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
327        tasks_instanciator: impl for<'c> Fn(
328            Vec<Option<&'c ComponentConfig>>,
329            &mut ResourceManager,
330        ) -> CuResult<CT>,
331        monitor_instanciator: impl Fn(&CuConfig) -> M,
332        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
333        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
334        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
335    ) -> CuResult<Self> {
336        let graph = config.get_graph(mission)?;
337        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
338            .get_all_nodes()
339            .iter()
340            .map(|(_, node)| node.get_instance_config())
341            .collect();
342
343        let mut resources = resources_instanciator(config)?;
344
345        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
346
347        let mut monitor = monitor_instanciator(config);
348        if let Ok(topology) = build_monitor_topology(config, mission) {
349            monitor.set_topology(topology);
350        }
351        let bridges = bridges_instanciator(config, &mut resources)?;
352
353        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
354            Some(logging_config) if logging_config.enable_task_logging => (
355                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
356                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
357                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
358            ),
359            Some(_) => (None, None, 0), // explicit no enable logging
360            None => (
361                // default
362                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
363                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
364                DEFAULT_KEYFRAME_INTERVAL,
365            ),
366        };
367
368        let copperlists_manager = CopperListsManager {
369            inner: CuListsManager::new(),
370            logger: copperlists_logger,
371        };
372
373        let keyframes_manager = KeyFramesManager {
374            inner: KeyFrame::new(),
375            logger: keyframes_logger,
376            keyframe_interval,
377        };
378
379        let runtime_config = config.runtime.clone().unwrap_or_default();
380
381        let runtime = Self {
382            tasks,
383            bridges,
384            #[cfg(feature = "std")]
385            threadpool,
386            resources,
387            monitor,
388            clock,
389            copperlists_manager,
390            keyframes_manager,
391            runtime_config,
392        };
393
394        Ok(runtime)
395    }
396}
397
398/// Copper tasks can be of 3 types:
399/// - Source: only producing output messages (usually used for drivers)
400/// - Regular: processing input messages and producing output messages, more like compute nodes.
401/// - Sink: only consuming input messages (usually used for actuators)
402#[derive(Debug, PartialEq, Eq, Clone, Copy)]
403pub enum CuTaskType {
404    Source,
405    Regular,
406    Sink,
407}
408
409/// This structure represents a step in the execution plan.
410pub struct CuExecutionStep {
411    /// NodeId: node id of the task to execute
412    pub node_id: NodeId,
413    /// Node: node instance
414    pub node: Node,
415    /// CuTaskType: type of the task
416    pub task_type: CuTaskType,
417
418    /// the indices in the copper list of the input messages and their types
419    pub input_msg_indices_types: Vec<(u32, String)>,
420
421    /// the index in the copper list of the output message and its type
422    pub output_msg_index_type: Option<(u32, String)>,
423}
424
425impl Debug for CuExecutionStep {
426    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
427        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
428        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
429        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
430        f.write_str(
431            format!(
432                "              input_msg_types: {:?}\n",
433                self.input_msg_indices_types
434            )
435            .as_str(),
436        )?;
437        f.write_str(
438            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
439        )?;
440        Ok(())
441    }
442}
443
444/// This structure represents a loop in the execution plan.
445/// It is used to represent a sequence of Execution units (loop or steps) that are executed
446/// multiple times.
447/// if loop_count is None, the loop is infinite.
448pub struct CuExecutionLoop {
449    pub steps: Vec<CuExecutionUnit>,
450    pub loop_count: Option<u32>,
451}
452
453impl Debug for CuExecutionLoop {
454    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
455        f.write_str("CuExecutionLoop:\n")?;
456        for step in &self.steps {
457            match step {
458                CuExecutionUnit::Step(step) => {
459                    step.fmt(f)?;
460                }
461                CuExecutionUnit::Loop(l) => {
462                    l.fmt(f)?;
463                }
464            }
465        }
466
467        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
468        Ok(())
469    }
470}
471
472/// This structure represents a step in the execution plan.
473#[derive(Debug)]
474pub enum CuExecutionUnit {
475    Step(CuExecutionStep),
476    Loop(CuExecutionLoop),
477}
478
479fn find_output_index_type_from_nodeid(
480    node_id: NodeId,
481    steps: &Vec<CuExecutionUnit>,
482) -> Option<(u32, String)> {
483    for step in steps {
484        match step {
485            CuExecutionUnit::Loop(loop_unit) => {
486                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
487                    return Some(index);
488                }
489            }
490            CuExecutionUnit::Step(step) => {
491                if step.node_id == node_id {
492                    return step.output_msg_index_type.clone();
493                }
494            }
495        }
496    }
497    None
498}
499
500pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
501    if graph.incoming_neighbor_count(node_id) == 0 {
502        CuTaskType::Source
503    } else if graph.outgoing_neighbor_count(node_id) == 0 {
504        CuTaskType::Sink
505    } else {
506        CuTaskType::Regular
507    }
508}
509
510/// This function gets the input node by using the input step plan id, to get the edge that
511/// connects the input to the output in the config graph
512fn find_edge_with_plan_input_id(
513    plan: &[CuExecutionUnit],
514    graph: &CuGraph,
515    plan_id: u32,
516    output_node_id: NodeId,
517) -> usize {
518    let input_node = plan
519        .get(plan_id as usize)
520        .expect("Input step should've been added to plan before the step that receives the input");
521    let CuExecutionUnit::Step(input_step) = input_node else {
522        panic!("Expected input to be from a step, not a loop");
523    };
524    let input_node_id = input_step.node_id;
525
526    graph
527        .0
528        .edges_connecting(input_node_id.into(), output_node_id.into())
529        .map(|edge| edge.id().index())
530        .next()
531        .expect("An edge connecting the input to the output should exist")
532}
533
534/// The connection id used here is the index of the config graph edge that equates to the wanted
535/// connection
536fn sort_inputs_by_cnx_id(
537    input_msg_indices_types: &mut [(u32, String)],
538    plan: &[CuExecutionUnit],
539    graph: &CuGraph,
540    curr_node_id: NodeId,
541) {
542    input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
543        let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
544        let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
545        a_edge_id.cmp(&b_edge_id)
546    });
547}
548/// Explores a subbranch and build the partial plan out of it.
549fn plan_tasks_tree_branch(
550    graph: &CuGraph,
551    mut next_culist_output_index: u32,
552    starting_point: NodeId,
553    plan: &mut Vec<CuExecutionUnit>,
554) -> (u32, bool) {
555    #[cfg(all(feature = "std", feature = "macro_debug"))]
556    eprintln!("-- starting branch from node {starting_point}");
557
558    let mut visitor = Bfs::new(&graph.0, starting_point.into());
559    let mut handled = false;
560
561    while let Some(node) = visitor.next(&graph.0) {
562        let id = node.index() as NodeId;
563        let node_ref = graph.get_node(id).unwrap();
564        #[cfg(all(feature = "std", feature = "macro_debug"))]
565        eprintln!("  Visiting node: {node_ref:?}");
566
567        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
568        let output_msg_index_type: Option<(u32, String)>;
569        let task_type = find_task_type_for_id(graph, id);
570
571        match task_type {
572            CuTaskType::Source => {
573                #[cfg(all(feature = "std", feature = "macro_debug"))]
574                eprintln!("    → Source node, assign output index {next_culist_output_index}");
575                output_msg_index_type = Some((
576                    next_culist_output_index,
577                    graph
578                        .0
579                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
580                        .unwrap() // FIXME(gbin): Error handling
581                        .msg
582                        .clone(),
583                ));
584                next_culist_output_index += 1;
585            }
586            CuTaskType::Sink => {
587                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
588                #[cfg(all(feature = "std", feature = "macro_debug"))]
589                eprintln!("    → Sink with parents: {parents:?}");
590                for parent in parents {
591                    let pid = parent;
592                    let index_type = find_output_index_type_from_nodeid(pid, plan);
593                    if let Some(index_type) = index_type {
594                        #[cfg(all(feature = "std", feature = "macro_debug"))]
595                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
596                        input_msg_indices_types.push(index_type);
597                    } else {
598                        #[cfg(all(feature = "std", feature = "macro_debug"))]
599                        eprintln!("      ✗ Input from {pid} not ready, returning");
600                        return (next_culist_output_index, handled);
601                    }
602                }
603                output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
604                next_culist_output_index += 1;
605            }
606            CuTaskType::Regular => {
607                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
608                #[cfg(all(feature = "std", feature = "macro_debug"))]
609                eprintln!("    → Regular task with parents: {parents:?}");
610                for parent in parents {
611                    let pid = parent;
612                    let index_type = find_output_index_type_from_nodeid(pid, plan);
613                    if let Some(index_type) = index_type {
614                        #[cfg(all(feature = "std", feature = "macro_debug"))]
615                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
616                        input_msg_indices_types.push(index_type);
617                    } else {
618                        #[cfg(all(feature = "std", feature = "macro_debug"))]
619                        eprintln!("      ✗ Input from {pid} not ready, returning");
620                        return (next_culist_output_index, handled);
621                    }
622                }
623                output_msg_index_type = Some((
624                    next_culist_output_index,
625                    graph
626                        .0
627                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) // FIXME(gbin): Error handling and multimission
628                        .unwrap()
629                        .msg
630                        .clone(),
631                ));
632                next_culist_output_index += 1;
633            }
634        }
635
636        sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
637
638        if let Some(pos) = plan
639            .iter()
640            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
641        {
642            #[cfg(all(feature = "std", feature = "macro_debug"))]
643            eprintln!("    → Already in plan, modifying existing step");
644            let mut step = plan.remove(pos);
645            if let CuExecutionUnit::Step(ref mut s) = step {
646                s.input_msg_indices_types = input_msg_indices_types;
647            }
648            plan.push(step);
649        } else {
650            #[cfg(all(feature = "std", feature = "macro_debug"))]
651            eprintln!("    → New step added to plan");
652            let step = CuExecutionStep {
653                node_id: id,
654                node: node_ref.clone(),
655                task_type,
656                input_msg_indices_types,
657                output_msg_index_type,
658            };
659            plan.push(CuExecutionUnit::Step(step));
660        }
661
662        handled = true;
663    }
664
665    #[cfg(all(feature = "std", feature = "macro_debug"))]
666    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
667    (next_culist_output_index, handled)
668}
669
670/// This is the main heuristics to compute an execution plan at compilation time.
671/// TODO(gbin): Make that heuristic pluggable.
672pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
673    #[cfg(all(feature = "std", feature = "macro_debug"))]
674    eprintln!("[runtime plan]");
675    let visited = graph.0.visit_map();
676    let mut plan = Vec::new();
677    let mut next_culist_output_index = 0u32;
678
679    let mut queue: VecDeque<NodeId> = graph
680        .node_indices()
681        .iter()
682        .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
683        .map(|node| node.index() as NodeId)
684        .collect();
685
686    #[cfg(all(feature = "std", feature = "macro_debug"))]
687    eprintln!("Initial source nodes: {queue:?}");
688
689    while let Some(start_node) = queue.pop_front() {
690        if visited.is_visited(&start_node) {
691            #[cfg(all(feature = "std", feature = "macro_debug"))]
692            eprintln!("→ Skipping already visited source {start_node}");
693            continue;
694        }
695
696        #[cfg(all(feature = "std", feature = "macro_debug"))]
697        eprintln!("→ Starting BFS from source {start_node}");
698        let mut bfs = Bfs::new(&graph.0, start_node.into());
699
700        while let Some(node_index) = bfs.next(&graph.0) {
701            let node_id = node_index.index() as NodeId;
702            let already_in_plan = plan
703                .iter()
704                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
705            if already_in_plan {
706                #[cfg(all(feature = "std", feature = "macro_debug"))]
707                eprintln!("    → Node {node_id} already planned, skipping");
708                continue;
709            }
710
711            #[cfg(all(feature = "std", feature = "macro_debug"))]
712            eprintln!("    Planning from node {node_id}");
713            let (new_index, handled) =
714                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
715            next_culist_output_index = new_index;
716
717            if !handled {
718                #[cfg(all(feature = "std", feature = "macro_debug"))]
719                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
720                continue;
721            }
722
723            #[cfg(all(feature = "std", feature = "macro_debug"))]
724            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
725            for neighbor in graph.0.neighbors(node_index) {
726                if !visited.is_visited(&neighbor) {
727                    let nid = neighbor.index() as NodeId;
728                    #[cfg(all(feature = "std", feature = "macro_debug"))]
729                    eprintln!("      → Enqueueing neighbor {nid}");
730                    queue.push_back(nid);
731                }
732            }
733        }
734    }
735
736    Ok(CuExecutionLoop {
737        steps: plan,
738        loop_count: None,
739    })
740}
741
742//tests
743#[cfg(test)]
744mod tests {
745    use super::*;
746    use crate::config::Node;
747    use crate::cutask::CuSinkTask;
748    use crate::cutask::{CuSrcTask, Freezable};
749    use crate::monitoring::NoMonitor;
750    use bincode::Encode;
751    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
752    use serde_derive::Serialize;
753
754    pub struct TestSource {}
755
756    impl Freezable for TestSource {}
757
758    impl CuSrcTask for TestSource {
759        type Resources<'r> = ();
760        type Output<'m> = ();
761        fn new_with(
762            _config: Option<&ComponentConfig>,
763            _resources: Self::Resources<'_>,
764        ) -> CuResult<Self>
765        where
766            Self: Sized,
767        {
768            Ok(Self {})
769        }
770
771        fn process(
772            &mut self,
773            _clock: &RobotClock,
774            _empty_msg: &mut Self::Output<'_>,
775        ) -> CuResult<()> {
776            Ok(())
777        }
778    }
779
780    pub struct TestSink {}
781
782    impl Freezable for TestSink {}
783
784    impl CuSinkTask for TestSink {
785        type Resources<'r> = ();
786        type Input<'m> = ();
787
788        fn new_with(
789            _config: Option<&ComponentConfig>,
790            _resources: Self::Resources<'_>,
791        ) -> CuResult<Self>
792        where
793            Self: Sized,
794        {
795            Ok(Self {})
796        }
797
798        fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
799            Ok(())
800        }
801    }
802
803    // Those should be generated by the derive macro
804    type Tasks = (TestSource, TestSink);
805
806    #[derive(Debug, Encode, Decode, Serialize, Default)]
807    struct Msgs(());
808
809    impl ErasedCuStampedDataSet for Msgs {
810        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
811            Vec::new()
812        }
813    }
814
815    impl MatchingTasks for Msgs {
816        fn get_all_task_ids() -> &'static [&'static str] {
817            &[]
818        }
819    }
820
821    impl CuListZeroedInit for Msgs {
822        fn init_zeroed(&mut self) {}
823    }
824
825    #[cfg(feature = "std")]
826    fn tasks_instanciator(
827        all_instances_configs: Vec<Option<&ComponentConfig>>,
828        _resources: &mut ResourceManager,
829        _threadpool: Arc<ThreadPool>,
830    ) -> CuResult<Tasks> {
831        Ok((
832            TestSource::new(all_instances_configs[0])?,
833            TestSink::new(all_instances_configs[1])?,
834        ))
835    }
836
837    #[cfg(not(feature = "std"))]
838    fn tasks_instanciator(
839        all_instances_configs: Vec<Option<&ComponentConfig>>,
840        _resources: &mut ResourceManager,
841    ) -> CuResult<Tasks> {
842        Ok((
843            TestSource::new(all_instances_configs[0])?,
844            TestSink::new(all_instances_configs[1])?,
845        ))
846    }
847
848    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
849        NoMonitor {}
850    }
851
852    fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
853        Ok(())
854    }
855
856    fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
857        Ok(ResourceManager::new(0))
858    }
859
860    #[derive(Debug)]
861    struct FakeWriter {}
862
863    impl<E: Encode> WriteStream<E> for FakeWriter {
864        fn log(&mut self, _obj: &E) -> CuResult<()> {
865            Ok(())
866        }
867    }
868
869    #[test]
870    fn test_runtime_instantiation() {
871        let mut config = CuConfig::default();
872        let graph = config.get_graph_mut(None).unwrap();
873        graph.add_node(Node::new("a", "TestSource")).unwrap();
874        graph.add_node(Node::new("b", "TestSink")).unwrap();
875        graph.connect(0, 1, "()").unwrap();
876        let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
877            RobotClock::default(),
878            &config,
879            None,
880            resources_instanciator,
881            tasks_instanciator,
882            monitor_instanciator,
883            bridges_instanciator,
884            FakeWriter {},
885            FakeWriter {},
886        );
887        assert!(runtime.is_ok());
888    }
889
890    #[test]
891    fn test_copperlists_manager_lifecycle() {
892        let mut config = CuConfig::default();
893        let graph = config.get_graph_mut(None).unwrap();
894        graph.add_node(Node::new("a", "TestSource")).unwrap();
895        graph.add_node(Node::new("b", "TestSink")).unwrap();
896        graph.connect(0, 1, "()").unwrap();
897
898        let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
899            RobotClock::default(),
900            &config,
901            None,
902            resources_instanciator,
903            tasks_instanciator,
904            monitor_instanciator,
905            bridges_instanciator,
906            FakeWriter {},
907            FakeWriter {},
908        )
909        .unwrap();
910
911        // Now emulates the generated runtime
912        {
913            let copperlists = &mut runtime.copperlists_manager;
914            let culist0 = copperlists
915                .inner
916                .create()
917                .expect("Ran out of space for copper lists");
918            // FIXME: error handling.
919            let id = culist0.id;
920            assert_eq!(id, 0);
921            culist0.change_state(CopperListState::Processing);
922            assert_eq!(copperlists.available_copper_lists(), 1);
923        }
924
925        {
926            let copperlists = &mut runtime.copperlists_manager;
927            let culist1 = copperlists
928                .inner
929                .create()
930                .expect("Ran out of space for copper lists"); // FIXME: error handling.
931            let id = culist1.id;
932            assert_eq!(id, 1);
933            culist1.change_state(CopperListState::Processing);
934            assert_eq!(copperlists.available_copper_lists(), 0);
935        }
936
937        {
938            let copperlists = &mut runtime.copperlists_manager;
939            let culist2 = copperlists.inner.create();
940            assert!(culist2.is_none());
941            assert_eq!(copperlists.available_copper_lists(), 0);
942            // Free in order, should let the top of the stack be serialized and freed.
943            let _ = copperlists.end_of_processing(1);
944            assert_eq!(copperlists.available_copper_lists(), 1);
945        }
946
947        // Readd a CL
948        {
949            let copperlists = &mut runtime.copperlists_manager;
950            let culist2 = copperlists
951                .inner
952                .create()
953                .expect("Ran out of space for copper lists"); // FIXME: error handling.
954            let id = culist2.id;
955            assert_eq!(id, 2);
956            culist2.change_state(CopperListState::Processing);
957            assert_eq!(copperlists.available_copper_lists(), 0);
958            // Free out of order, the #0 first
959            let _ = copperlists.end_of_processing(0);
960            // Should not free up the top of the stack
961            assert_eq!(copperlists.available_copper_lists(), 0);
962
963            // Free up the top of the stack
964            let _ = copperlists.end_of_processing(2);
965            // This should free up 2 CLs
966
967            assert_eq!(copperlists.available_copper_lists(), 2);
968        }
969    }
970
971    #[test]
972    fn test_runtime_task_input_order() {
973        let mut config = CuConfig::default();
974        let graph = config.get_graph_mut(None).unwrap();
975        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
976        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
977        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
978
979        assert_eq!(src1_id, 0);
980        assert_eq!(src2_id, 1);
981
982        // note that the source2 connection is before the source1
983        let src1_type = "src1_type";
984        let src2_type = "src2_type";
985        graph.connect(src2_id, sink_id, src2_type).unwrap();
986        graph.connect(src1_id, sink_id, src1_type).unwrap();
987
988        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
989        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
990        // the edge id depends on the order the connection is created, not
991        // on the node id, and that is what determines the input order
992        assert_eq!(src1_edge_id, 1);
993        assert_eq!(src2_edge_id, 0);
994
995        let runtime = compute_runtime_plan(graph).unwrap();
996        let sink_step = runtime
997            .steps
998            .iter()
999            .find_map(|step| match step {
1000                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1001                _ => None,
1002            })
1003            .unwrap();
1004
1005        // since the src2 connection was added before src1 connection, the src2 type should be
1006        // first
1007        assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
1008        assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
1009    }
1010
1011    #[test]
1012    fn test_runtime_plan_diamond_case1() {
1013        // more complex topology that tripped the scheduler
1014        let mut config = CuConfig::default();
1015        let graph = config.get_graph_mut(None).unwrap();
1016        let cam0_id = graph
1017            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1018            .unwrap();
1019        let inf0_id = graph
1020            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1021            .unwrap();
1022        let broadcast_id = graph
1023            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1024            .unwrap();
1025
1026        // case 1 order
1027        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1028        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1029        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1030
1031        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1032        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1033
1034        assert_eq!(edge_cam0_to_inf0, 0);
1035        assert_eq!(edge_cam0_to_broadcast, 1);
1036
1037        let runtime = compute_runtime_plan(graph).unwrap();
1038        let broadcast_step = runtime
1039            .steps
1040            .iter()
1041            .find_map(|step| match step {
1042                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1043                _ => None,
1044            })
1045            .unwrap();
1046
1047        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1048        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1049    }
1050
1051    #[test]
1052    fn test_runtime_plan_diamond_case2() {
1053        // more complex topology that tripped the scheduler variation 2
1054        let mut config = CuConfig::default();
1055        let graph = config.get_graph_mut(None).unwrap();
1056        let cam0_id = graph
1057            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1058            .unwrap();
1059        let inf0_id = graph
1060            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1061            .unwrap();
1062        let broadcast_id = graph
1063            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1064            .unwrap();
1065
1066        // case 2 order
1067        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1068        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1069        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1070
1071        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1072        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1073
1074        assert_eq!(edge_cam0_to_broadcast, 0);
1075        assert_eq!(edge_cam0_to_inf0, 1);
1076
1077        let runtime = compute_runtime_plan(graph).unwrap();
1078        let broadcast_step = runtime
1079            .steps
1080            .iter()
1081            .find_map(|step| match step {
1082                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1083                _ => None,
1084            })
1085            .unwrap();
1086
1087        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1088        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1089    }
1090}