cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, CuDirection, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::CuMonitor;
10use cu29_clock::{ClockProvider, CuTime, RobotClock};
11use cu29_traits::CuResult;
12use cu29_traits::WriteStream;
13use cu29_traits::{CopperListTuple, CuError};
14
15use bincode::enc::write::{SizeWriter, SliceWriter};
16use bincode::enc::EncoderImpl;
17use bincode::error::EncodeError;
18use bincode::{Decode, Encode};
19use core::fmt::{Debug, Formatter};
20use petgraph::prelude::*;
21use petgraph::visit::VisitMap;
22use petgraph::visit::Visitable;
23
24#[cfg(not(feature = "std"))]
25mod imp {
26    pub use alloc::boxed::Box;
27    pub use alloc::collections::VecDeque;
28    pub use alloc::format;
29    pub use alloc::string::String;
30    pub use alloc::string::ToString;
31    pub use alloc::vec::Vec;
32    pub use core::fmt::Result as FmtResult;
33}
34
35#[cfg(feature = "std")]
36mod imp {
37    pub use cu29_log_runtime::LoggerRuntime;
38    pub use cu29_unifiedlog::UnifiedLoggerWrite;
39    pub use rayon::ThreadPool;
40    pub use std::collections::VecDeque;
41    pub use std::fmt::Result as FmtResult;
42    pub use std::sync::{Arc, Mutex};
43}
44
45use imp::*;
46
47/// Just a simple struct to hold the various bits needed to run a Copper application.
48#[cfg(feature = "std")]
49pub struct CopperContext {
50    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
51    pub logger_runtime: LoggerRuntime,
52    pub clock: RobotClock,
53}
54
55/// Manages the lifecycle of the copper lists and logging.
56pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
57    pub inner: CuListsManager<P, NBCL>,
58    /// Logger for the copper lists (messages between tasks)
59    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
60}
61
62impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
63    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
64        let mut is_top = true;
65        let mut nb_done = 0;
66        for cl in self.inner.iter_mut() {
67            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
68                cl.change_state(CopperListState::DoneProcessing);
69            }
70            if is_top && cl.get_state() == CopperListState::DoneProcessing {
71                if let Some(logger) = &mut self.logger {
72                    cl.change_state(CopperListState::BeingSerialized);
73                    logger.log(cl)?;
74                }
75                cl.change_state(CopperListState::Free);
76                nb_done += 1;
77            } else {
78                is_top = false;
79            }
80        }
81        for _ in 0..nb_done {
82            let _ = self.inner.pop();
83        }
84        Ok(())
85    }
86
87    pub fn available_copper_lists(&self) -> usize {
88        NBCL - self.inner.len()
89    }
90}
91
92/// Manages the frozen tasks state and logging.
93pub struct KeyFramesManager {
94    /// Where the serialized tasks are stored following the wave of execution of a CL.
95    inner: KeyFrame,
96
97    /// Logger for the state of the tasks (frozen tasks)
98    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
99
100    /// Capture a keyframe only each...
101    keyframe_interval: u32,
102}
103
104impl KeyFramesManager {
105    fn is_keyframe(&self, culistid: u32) -> bool {
106        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
107    }
108
109    pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
110        if self.is_keyframe(culistid) {
111            self.inner.reset(culistid, clock.now());
112        }
113    }
114
115    pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
116        if self.is_keyframe(culistid) {
117            if self.inner.culistid != culistid {
118                panic!("Freezing task for a different culistid");
119            }
120            self.inner
121                .add_frozen_task(task)
122                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
123        } else {
124            Ok(0)
125        }
126    }
127
128    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
129        if self.is_keyframe(culistid) {
130            let logger = self.logger.as_mut().unwrap();
131            logger.log(&self.inner)
132        } else {
133            Ok(())
134        }
135    }
136}
137
138/// This is the main structure that will be injected as a member of the Application struct.
139/// CT is the tuple of all the tasks in order of execution.
140/// CL is the type of the copper list, representing the input/output messages for all the tasks.
141pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
142    /// The base clock the runtime will be using to record time.
143    pub clock: RobotClock, // TODO: remove public at some point
144
145    /// The tuple of all the tasks in order of execution.
146    pub tasks: CT,
147
148    /// Tuple of all instantiated bridges.
149    pub bridges: CB,
150
151    /// For backgrounded tasks.
152    #[cfg(feature = "std")]
153    pub threadpool: Arc<ThreadPool>,
154
155    /// The runtime monitoring.
156    pub monitor: M,
157
158    /// The logger for the copper lists (messages between tasks)
159    pub copperlists_manager: CopperListsManager<P, NBCL>,
160
161    /// The logger for the state of the tasks (frozen tasks)
162    pub keyframes_manager: KeyFramesManager,
163
164    /// The runtime configuration controlling the behavior of the run loop
165    pub runtime_config: RuntimeConfig,
166}
167
168/// To be able to share the clock we make the runtime a clock provider.
169impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
170    ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
171{
172    fn get_clock(&self) -> RobotClock {
173        self.clock.clone()
174    }
175}
176
177/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
178/// It is a double encapsulation: this one recording the culistid and another even in
179/// bincode in the serialized_tasks.
180#[derive(Encode, Decode)]
181pub struct KeyFrame {
182    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
183    pub culistid: u32,
184    // This is the timestamp when the keyframe was created, using the robot clock.
185    pub timestamp: CuTime,
186    // This is the bincode representation of the tuple of all the tasks.
187    pub serialized_tasks: Vec<u8>,
188}
189
190impl KeyFrame {
191    fn new() -> Self {
192        KeyFrame {
193            culistid: 0,
194            timestamp: CuTime::default(),
195            serialized_tasks: Vec::new(),
196        }
197    }
198
199    /// This is to be able to avoid reallocations
200    fn reset(&mut self, culistid: u32, timestamp: CuTime) {
201        self.culistid = culistid;
202        self.timestamp = timestamp;
203        self.serialized_tasks.clear();
204    }
205
206    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
207    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
208        let cfg = bincode::config::standard();
209        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
210        BincodeAdapter(task).encode(&mut sizer)?;
211        let need = sizer.into_writer().bytes_written as usize;
212
213        let start = self.serialized_tasks.len();
214        self.serialized_tasks.resize(start + need, 0);
215        let mut enc =
216            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
217        BincodeAdapter(task).encode(&mut enc)?;
218        Ok(need)
219    }
220}
221
222impl<
223        CT,
224        CB,
225        P: CopperListTuple + CuListZeroedInit + Default + 'static,
226        M: CuMonitor,
227        const NBCL: usize,
228    > CuRuntime<CT, CB, P, M, NBCL>
229{
230    // FIXME(gbin): this became REALLY ugly with no-std
231    #[allow(clippy::too_many_arguments)]
232    #[cfg(feature = "std")]
233    pub fn new(
234        clock: RobotClock,
235        config: &CuConfig,
236        mission: Option<&str>,
237        tasks_instanciator: impl for<'c> Fn(
238            Vec<Option<&'c ComponentConfig>>,
239            Arc<ThreadPool>,
240        ) -> CuResult<CT>,
241        monitor_instanciator: impl Fn(&CuConfig) -> M,
242        bridges_instanciator: impl Fn(&CuConfig) -> CuResult<CB>,
243        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
244        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
245    ) -> CuResult<Self> {
246        let graph = config.get_graph(mission)?;
247        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
248            .get_all_nodes()
249            .iter()
250            .map(|(_, node)| node.get_instance_config())
251            .collect();
252
253        // TODO: make that configurable
254
255        let threadpool = Arc::new(
256            rayon::ThreadPoolBuilder::new()
257                .num_threads(2) // default to 4 threads if not specified
258                .build()
259                .expect("Could not create the threadpool"),
260        );
261
262        let tasks = tasks_instanciator(all_instances_configs, threadpool.clone())?;
263        let monitor = monitor_instanciator(config);
264        let bridges = bridges_instanciator(config)?;
265
266        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
267            Some(logging_config) if logging_config.enable_task_logging => (
268                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
269                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
270                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
271            ),
272            Some(_) => (None, None, 0), // explicit no enable logging
273            None => (
274                // default
275                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
276                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
277                DEFAULT_KEYFRAME_INTERVAL,
278            ),
279        };
280
281        let copperlists_manager = CopperListsManager {
282            inner: CuListsManager::new(),
283            logger: copperlists_logger,
284        };
285
286        let keyframes_manager = KeyFramesManager {
287            inner: KeyFrame::new(),
288            logger: keyframes_logger,
289            keyframe_interval,
290        };
291
292        let runtime_config = config.runtime.clone().unwrap_or_default();
293
294        let runtime = Self {
295            tasks,
296            bridges,
297            threadpool,
298            monitor,
299            clock,
300            copperlists_manager,
301            keyframes_manager,
302            runtime_config,
303        };
304
305        Ok(runtime)
306    }
307
308    #[allow(clippy::too_many_arguments)]
309    #[cfg(not(feature = "std"))]
310    pub fn new(
311        clock: RobotClock,
312        config: &CuConfig,
313        mission: Option<&str>,
314        tasks_instanciator: impl for<'c> Fn(Vec<Option<&'c ComponentConfig>>) -> CuResult<CT>,
315        monitor_instanciator: impl Fn(&CuConfig) -> M,
316        bridges_instanciator: impl Fn(&CuConfig) -> CuResult<CB>,
317        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
318        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
319    ) -> CuResult<Self> {
320        let graph = config.get_graph(mission)?;
321        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
322            .get_all_nodes()
323            .iter()
324            .map(|(_, node)| node.get_instance_config())
325            .collect();
326
327        let tasks = tasks_instanciator(all_instances_configs)?;
328
329        let monitor = monitor_instanciator(config);
330        let bridges = bridges_instanciator(config)?;
331
332        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
333            Some(logging_config) if logging_config.enable_task_logging => (
334                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
335                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
336                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
337            ),
338            Some(_) => (None, None, 0), // explicit no enable logging
339            None => (
340                // default
341                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
342                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
343                DEFAULT_KEYFRAME_INTERVAL,
344            ),
345        };
346
347        let copperlists_manager = CopperListsManager {
348            inner: CuListsManager::new(),
349            logger: copperlists_logger,
350        };
351
352        let keyframes_manager = KeyFramesManager {
353            inner: KeyFrame::new(),
354            logger: keyframes_logger,
355            keyframe_interval,
356        };
357
358        let runtime_config = config.runtime.clone().unwrap_or_default();
359
360        let runtime = Self {
361            tasks,
362            bridges,
363            #[cfg(feature = "std")]
364            threadpool,
365            monitor,
366            clock,
367            copperlists_manager,
368            keyframes_manager,
369            runtime_config,
370        };
371
372        Ok(runtime)
373    }
374}
375
376/// Copper tasks can be of 3 types:
377/// - Source: only producing output messages (usually used for drivers)
378/// - Regular: processing input messages and producing output messages, more like compute nodes.
379/// - Sink: only consuming input messages (usually used for actuators)
380#[derive(Debug, PartialEq, Eq, Clone, Copy)]
381pub enum CuTaskType {
382    Source,
383    Regular,
384    Sink,
385}
386
387/// This structure represents a step in the execution plan.
388pub struct CuExecutionStep {
389    /// NodeId: node id of the task to execute
390    pub node_id: NodeId,
391    /// Node: node instance
392    pub node: Node,
393    /// CuTaskType: type of the task
394    pub task_type: CuTaskType,
395
396    /// the indices in the copper list of the input messages and their types
397    pub input_msg_indices_types: Vec<(u32, String)>,
398
399    /// the index in the copper list of the output message and its type
400    pub output_msg_index_type: Option<(u32, String)>,
401}
402
403impl Debug for CuExecutionStep {
404    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
405        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
406        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
407        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
408        f.write_str(
409            format!(
410                "              input_msg_types: {:?}\n",
411                self.input_msg_indices_types
412            )
413            .as_str(),
414        )?;
415        f.write_str(
416            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
417        )?;
418        Ok(())
419    }
420}
421
422/// This structure represents a loop in the execution plan.
423/// It is used to represent a sequence of Execution units (loop or steps) that are executed
424/// multiple times.
425/// if loop_count is None, the loop is infinite.
426pub struct CuExecutionLoop {
427    pub steps: Vec<CuExecutionUnit>,
428    pub loop_count: Option<u32>,
429}
430
431impl Debug for CuExecutionLoop {
432    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
433        f.write_str("CuExecutionLoop:\n")?;
434        for step in &self.steps {
435            match step {
436                CuExecutionUnit::Step(step) => {
437                    step.fmt(f)?;
438                }
439                CuExecutionUnit::Loop(l) => {
440                    l.fmt(f)?;
441                }
442            }
443        }
444
445        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
446        Ok(())
447    }
448}
449
450/// This structure represents a step in the execution plan.
451#[derive(Debug)]
452pub enum CuExecutionUnit {
453    Step(CuExecutionStep),
454    Loop(CuExecutionLoop),
455}
456
457fn find_output_index_type_from_nodeid(
458    node_id: NodeId,
459    steps: &Vec<CuExecutionUnit>,
460) -> Option<(u32, String)> {
461    for step in steps {
462        match step {
463            CuExecutionUnit::Loop(loop_unit) => {
464                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
465                    return Some(index);
466                }
467            }
468            CuExecutionUnit::Step(step) => {
469                if step.node_id == node_id {
470                    return step.output_msg_index_type.clone();
471                }
472            }
473        }
474    }
475    None
476}
477
478pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
479    if graph.incoming_neighbor_count(node_id) == 0 {
480        CuTaskType::Source
481    } else if graph.outgoing_neighbor_count(node_id) == 0 {
482        CuTaskType::Sink
483    } else {
484        CuTaskType::Regular
485    }
486}
487
488/// This function gets the input node by using the input step plan id, to get the edge that
489/// connects the input to the output in the config graph
490fn find_edge_with_plan_input_id(
491    plan: &[CuExecutionUnit],
492    graph: &CuGraph,
493    plan_id: u32,
494    output_node_id: NodeId,
495) -> usize {
496    let input_node = plan
497        .get(plan_id as usize)
498        .expect("Input step should've been added to plan before the step that receives the input");
499    let CuExecutionUnit::Step(input_step) = input_node else {
500        panic!("Expected input to be from a step, not a loop");
501    };
502    let input_node_id = input_step.node_id;
503
504    graph
505        .0
506        .edges_connecting(input_node_id.into(), output_node_id.into())
507        .map(|edge| edge.id().index())
508        .next()
509        .expect("An edge connecting the input to the output should exist")
510}
511
512/// The connection id used here is the index of the config graph edge that equates to the wanted
513/// connection
514fn sort_inputs_by_cnx_id(
515    input_msg_indices_types: &mut [(u32, String)],
516    plan: &[CuExecutionUnit],
517    graph: &CuGraph,
518    curr_node_id: NodeId,
519) {
520    input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
521        let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
522        let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
523        a_edge_id.cmp(&b_edge_id)
524    });
525}
526/// Explores a subbranch and build the partial plan out of it.
527fn plan_tasks_tree_branch(
528    graph: &CuGraph,
529    mut next_culist_output_index: u32,
530    starting_point: NodeId,
531    plan: &mut Vec<CuExecutionUnit>,
532) -> (u32, bool) {
533    #[cfg(all(feature = "std", feature = "macro_debug"))]
534    eprintln!("-- starting branch from node {starting_point}");
535
536    let mut visitor = Bfs::new(&graph.0, starting_point.into());
537    let mut handled = false;
538
539    while let Some(node) = visitor.next(&graph.0) {
540        let id = node.index() as NodeId;
541        let node_ref = graph.get_node(id).unwrap();
542        #[cfg(all(feature = "std", feature = "macro_debug"))]
543        eprintln!("  Visiting node: {node_ref:?}");
544
545        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
546        let output_msg_index_type: Option<(u32, String)>;
547        let task_type = find_task_type_for_id(graph, id);
548
549        match task_type {
550            CuTaskType::Source => {
551                #[cfg(all(feature = "std", feature = "macro_debug"))]
552                eprintln!("    → Source node, assign output index {next_culist_output_index}");
553                output_msg_index_type = Some((
554                    next_culist_output_index,
555                    graph
556                        .0
557                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
558                        .unwrap() // FIXME(gbin): Error handling
559                        .msg
560                        .clone(),
561                ));
562                next_culist_output_index += 1;
563            }
564            CuTaskType::Sink => {
565                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
566                #[cfg(all(feature = "std", feature = "macro_debug"))]
567                eprintln!("    → Sink with parents: {parents:?}");
568                for parent in parents {
569                    let pid = parent;
570                    let index_type = find_output_index_type_from_nodeid(pid, plan);
571                    if let Some(index_type) = index_type {
572                        #[cfg(all(feature = "std", feature = "macro_debug"))]
573                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
574                        input_msg_indices_types.push(index_type);
575                    } else {
576                        #[cfg(all(feature = "std", feature = "macro_debug"))]
577                        eprintln!("      ✗ Input from {pid} not ready, returning");
578                        return (next_culist_output_index, handled);
579                    }
580                }
581                output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
582                next_culist_output_index += 1;
583            }
584            CuTaskType::Regular => {
585                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
586                #[cfg(all(feature = "std", feature = "macro_debug"))]
587                eprintln!("    → Regular task with parents: {parents:?}");
588                for parent in parents {
589                    let pid = parent;
590                    let index_type = find_output_index_type_from_nodeid(pid, plan);
591                    if let Some(index_type) = index_type {
592                        #[cfg(all(feature = "std", feature = "macro_debug"))]
593                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
594                        input_msg_indices_types.push(index_type);
595                    } else {
596                        #[cfg(all(feature = "std", feature = "macro_debug"))]
597                        eprintln!("      ✗ Input from {pid} not ready, returning");
598                        return (next_culist_output_index, handled);
599                    }
600                }
601                output_msg_index_type = Some((
602                    next_culist_output_index,
603                    graph
604                        .0
605                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) // FIXME(gbin): Error handling and multimission
606                        .unwrap()
607                        .msg
608                        .clone(),
609                ));
610                next_culist_output_index += 1;
611            }
612        }
613
614        sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
615
616        if let Some(pos) = plan
617            .iter()
618            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
619        {
620            #[cfg(all(feature = "std", feature = "macro_debug"))]
621            eprintln!("    → Already in plan, modifying existing step");
622            let mut step = plan.remove(pos);
623            if let CuExecutionUnit::Step(ref mut s) = step {
624                s.input_msg_indices_types = input_msg_indices_types;
625            }
626            plan.push(step);
627        } else {
628            #[cfg(all(feature = "std", feature = "macro_debug"))]
629            eprintln!("    → New step added to plan");
630            let step = CuExecutionStep {
631                node_id: id,
632                node: node_ref.clone(),
633                task_type,
634                input_msg_indices_types,
635                output_msg_index_type,
636            };
637            plan.push(CuExecutionUnit::Step(step));
638        }
639
640        handled = true;
641    }
642
643    #[cfg(all(feature = "std", feature = "macro_debug"))]
644    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
645    (next_culist_output_index, handled)
646}
647
648/// This is the main heuristics to compute an execution plan at compilation time.
649/// TODO(gbin): Make that heuristic pluggable.
650pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
651    #[cfg(all(feature = "std", feature = "macro_debug"))]
652    eprintln!("[runtime plan]");
653    let visited = graph.0.visit_map();
654    let mut plan = Vec::new();
655    let mut next_culist_output_index = 0u32;
656
657    let mut queue: VecDeque<NodeId> = graph
658        .node_indices()
659        .iter()
660        .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
661        .map(|node| node.index() as NodeId)
662        .collect();
663
664    #[cfg(all(feature = "std", feature = "macro_debug"))]
665    eprintln!("Initial source nodes: {queue:?}");
666
667    while let Some(start_node) = queue.pop_front() {
668        if visited.is_visited(&start_node) {
669            #[cfg(all(feature = "std", feature = "macro_debug"))]
670            eprintln!("→ Skipping already visited source {start_node}");
671            continue;
672        }
673
674        #[cfg(all(feature = "std", feature = "macro_debug"))]
675        eprintln!("→ Starting BFS from source {start_node}");
676        let mut bfs = Bfs::new(&graph.0, start_node.into());
677
678        while let Some(node_index) = bfs.next(&graph.0) {
679            let node_id = node_index.index() as NodeId;
680            let already_in_plan = plan
681                .iter()
682                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
683            if already_in_plan {
684                #[cfg(all(feature = "std", feature = "macro_debug"))]
685                eprintln!("    → Node {node_id} already planned, skipping");
686                continue;
687            }
688
689            #[cfg(all(feature = "std", feature = "macro_debug"))]
690            eprintln!("    Planning from node {node_id}");
691            let (new_index, handled) =
692                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
693            next_culist_output_index = new_index;
694
695            if !handled {
696                #[cfg(all(feature = "std", feature = "macro_debug"))]
697                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
698                continue;
699            }
700
701            #[cfg(all(feature = "std", feature = "macro_debug"))]
702            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
703            for neighbor in graph.0.neighbors(node_index) {
704                if !visited.is_visited(&neighbor) {
705                    let nid = neighbor.index() as NodeId;
706                    #[cfg(all(feature = "std", feature = "macro_debug"))]
707                    eprintln!("      → Enqueueing neighbor {nid}");
708                    queue.push_back(nid);
709                }
710            }
711        }
712    }
713
714    Ok(CuExecutionLoop {
715        steps: plan,
716        loop_count: None,
717    })
718}
719
720//tests
721#[cfg(test)]
722mod tests {
723    use super::*;
724    use crate::config::Node;
725    use crate::cutask::CuSinkTask;
726    use crate::cutask::{CuSrcTask, Freezable};
727    use crate::monitoring::NoMonitor;
728    use bincode::Encode;
729    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
730    use serde_derive::Serialize;
731
732    pub struct TestSource {}
733
734    impl Freezable for TestSource {}
735
736    impl CuSrcTask for TestSource {
737        type Output<'m> = ();
738        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
739        where
740            Self: Sized,
741        {
742            Ok(Self {})
743        }
744
745        fn process(
746            &mut self,
747            _clock: &RobotClock,
748            _empty_msg: &mut Self::Output<'_>,
749        ) -> CuResult<()> {
750            Ok(())
751        }
752    }
753
754    pub struct TestSink {}
755
756    impl Freezable for TestSink {}
757
758    impl CuSinkTask for TestSink {
759        type Input<'m> = ();
760
761        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
762        where
763            Self: Sized,
764        {
765            Ok(Self {})
766        }
767
768        fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
769            Ok(())
770        }
771    }
772
773    // Those should be generated by the derive macro
774    type Tasks = (TestSource, TestSink);
775
776    #[derive(Debug, Encode, Decode, Serialize, Default)]
777    struct Msgs(());
778
779    impl ErasedCuStampedDataSet for Msgs {
780        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
781            Vec::new()
782        }
783    }
784
785    impl MatchingTasks for Msgs {
786        fn get_all_task_ids() -> &'static [&'static str] {
787            &[]
788        }
789    }
790
791    impl CuListZeroedInit for Msgs {
792        fn init_zeroed(&mut self) {}
793    }
794
795    #[cfg(feature = "std")]
796    fn tasks_instanciator(
797        all_instances_configs: Vec<Option<&ComponentConfig>>,
798        _threadpool: Arc<ThreadPool>,
799    ) -> CuResult<Tasks> {
800        Ok((
801            TestSource::new(all_instances_configs[0])?,
802            TestSink::new(all_instances_configs[1])?,
803        ))
804    }
805
806    #[cfg(not(feature = "std"))]
807    fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
808        Ok((
809            TestSource::new(all_instances_configs[0])?,
810            TestSink::new(all_instances_configs[1])?,
811        ))
812    }
813
814    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
815        NoMonitor {}
816    }
817
818    fn bridges_instanciator(_config: &CuConfig) -> CuResult<()> {
819        Ok(())
820    }
821
822    #[derive(Debug)]
823    struct FakeWriter {}
824
825    impl<E: Encode> WriteStream<E> for FakeWriter {
826        fn log(&mut self, _obj: &E) -> CuResult<()> {
827            Ok(())
828        }
829    }
830
831    #[test]
832    fn test_runtime_instantiation() {
833        let mut config = CuConfig::default();
834        let graph = config.get_graph_mut(None).unwrap();
835        graph.add_node(Node::new("a", "TestSource")).unwrap();
836        graph.add_node(Node::new("b", "TestSink")).unwrap();
837        graph.connect(0, 1, "()").unwrap();
838        let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
839            RobotClock::default(),
840            &config,
841            None,
842            tasks_instanciator,
843            monitor_instanciator,
844            bridges_instanciator,
845            FakeWriter {},
846            FakeWriter {},
847        );
848        assert!(runtime.is_ok());
849    }
850
851    #[test]
852    fn test_copperlists_manager_lifecycle() {
853        let mut config = CuConfig::default();
854        let graph = config.get_graph_mut(None).unwrap();
855        graph.add_node(Node::new("a", "TestSource")).unwrap();
856        graph.add_node(Node::new("b", "TestSink")).unwrap();
857        graph.connect(0, 1, "()").unwrap();
858
859        let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
860            RobotClock::default(),
861            &config,
862            None,
863            tasks_instanciator,
864            monitor_instanciator,
865            bridges_instanciator,
866            FakeWriter {},
867            FakeWriter {},
868        )
869        .unwrap();
870
871        // Now emulates the generated runtime
872        {
873            let copperlists = &mut runtime.copperlists_manager;
874            let culist0 = copperlists
875                .inner
876                .create()
877                .expect("Ran out of space for copper lists");
878            // FIXME: error handling.
879            let id = culist0.id;
880            assert_eq!(id, 0);
881            culist0.change_state(CopperListState::Processing);
882            assert_eq!(copperlists.available_copper_lists(), 1);
883        }
884
885        {
886            let copperlists = &mut runtime.copperlists_manager;
887            let culist1 = copperlists
888                .inner
889                .create()
890                .expect("Ran out of space for copper lists"); // FIXME: error handling.
891            let id = culist1.id;
892            assert_eq!(id, 1);
893            culist1.change_state(CopperListState::Processing);
894            assert_eq!(copperlists.available_copper_lists(), 0);
895        }
896
897        {
898            let copperlists = &mut runtime.copperlists_manager;
899            let culist2 = copperlists.inner.create();
900            assert!(culist2.is_none());
901            assert_eq!(copperlists.available_copper_lists(), 0);
902            // Free in order, should let the top of the stack be serialized and freed.
903            let _ = copperlists.end_of_processing(1);
904            assert_eq!(copperlists.available_copper_lists(), 1);
905        }
906
907        // Readd a CL
908        {
909            let copperlists = &mut runtime.copperlists_manager;
910            let culist2 = copperlists
911                .inner
912                .create()
913                .expect("Ran out of space for copper lists"); // FIXME: error handling.
914            let id = culist2.id;
915            assert_eq!(id, 2);
916            culist2.change_state(CopperListState::Processing);
917            assert_eq!(copperlists.available_copper_lists(), 0);
918            // Free out of order, the #0 first
919            let _ = copperlists.end_of_processing(0);
920            // Should not free up the top of the stack
921            assert_eq!(copperlists.available_copper_lists(), 0);
922
923            // Free up the top of the stack
924            let _ = copperlists.end_of_processing(2);
925            // This should free up 2 CLs
926
927            assert_eq!(copperlists.available_copper_lists(), 2);
928        }
929    }
930
931    #[test]
932    fn test_runtime_task_input_order() {
933        let mut config = CuConfig::default();
934        let graph = config.get_graph_mut(None).unwrap();
935        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
936        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
937        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
938
939        assert_eq!(src1_id, 0);
940        assert_eq!(src2_id, 1);
941
942        // note that the source2 connection is before the source1
943        let src1_type = "src1_type";
944        let src2_type = "src2_type";
945        graph.connect(src2_id, sink_id, src2_type).unwrap();
946        graph.connect(src1_id, sink_id, src1_type).unwrap();
947
948        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
949        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
950        // the edge id depends on the order the connection is created, not
951        // on the node id, and that is what determines the input order
952        assert_eq!(src1_edge_id, 1);
953        assert_eq!(src2_edge_id, 0);
954
955        let runtime = compute_runtime_plan(graph).unwrap();
956        let sink_step = runtime
957            .steps
958            .iter()
959            .find_map(|step| match step {
960                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
961                _ => None,
962            })
963            .unwrap();
964
965        // since the src2 connection was added before src1 connection, the src2 type should be
966        // first
967        assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
968        assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
969    }
970
971    #[test]
972    fn test_runtime_plan_diamond_case1() {
973        // more complex topology that tripped the scheduler
974        let mut config = CuConfig::default();
975        let graph = config.get_graph_mut(None).unwrap();
976        let cam0_id = graph
977            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
978            .unwrap();
979        let inf0_id = graph
980            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
981            .unwrap();
982        let broadcast_id = graph
983            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
984            .unwrap();
985
986        // case 1 order
987        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
988        graph.connect(cam0_id, inf0_id, "i32").unwrap();
989        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
990
991        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
992        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
993
994        assert_eq!(edge_cam0_to_inf0, 0);
995        assert_eq!(edge_cam0_to_broadcast, 1);
996
997        let runtime = compute_runtime_plan(graph).unwrap();
998        let broadcast_step = runtime
999            .steps
1000            .iter()
1001            .find_map(|step| match step {
1002                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1003                _ => None,
1004            })
1005            .unwrap();
1006
1007        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1008        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1009    }
1010
1011    #[test]
1012    fn test_runtime_plan_diamond_case2() {
1013        // more complex topology that tripped the scheduler variation 2
1014        let mut config = CuConfig::default();
1015        let graph = config.get_graph_mut(None).unwrap();
1016        let cam0_id = graph
1017            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1018            .unwrap();
1019        let inf0_id = graph
1020            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1021            .unwrap();
1022        let broadcast_id = graph
1023            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1024            .unwrap();
1025
1026        // case 2 order
1027        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1028        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1029        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1030
1031        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1032        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1033
1034        assert_eq!(edge_cam0_to_broadcast, 0);
1035        assert_eq!(edge_cam0_to_inf0, 1);
1036
1037        let runtime = compute_runtime_plan(graph).unwrap();
1038        let broadcast_step = runtime
1039            .steps
1040            .iter()
1041            .find_map(|step| match step {
1042                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1043                _ => None,
1044            })
1045            .unwrap();
1046
1047        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1048        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1049    }
1050}