cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, CuDirection, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::CuMonitor;
10use cu29_clock::{ClockProvider, CuTime, RobotClock};
11use cu29_traits::CuResult;
12use cu29_traits::WriteStream;
13use cu29_traits::{CopperListTuple, CuError};
14
15use bincode::enc::write::{SizeWriter, SliceWriter};
16use bincode::enc::EncoderImpl;
17use bincode::error::EncodeError;
18use bincode::{Decode, Encode};
19use core::fmt::{Debug, Formatter};
20use petgraph::prelude::*;
21use petgraph::visit::VisitMap;
22use petgraph::visit::Visitable;
23
24#[cfg(not(feature = "std"))]
25mod imp {
26    pub use alloc::boxed::Box;
27    pub use alloc::collections::VecDeque;
28    pub use alloc::format;
29    pub use alloc::string::String;
30    pub use alloc::string::ToString;
31    pub use alloc::vec::Vec;
32    pub use core::fmt::Result as FmtResult;
33}
34
35#[cfg(feature = "std")]
36mod imp {
37    pub use cu29_log_runtime::LoggerRuntime;
38    pub use cu29_unifiedlog::UnifiedLoggerWrite;
39    pub use rayon::ThreadPool;
40    pub use std::collections::VecDeque;
41    pub use std::fmt::Result as FmtResult;
42    pub use std::sync::{Arc, Mutex};
43}
44
45use imp::*;
46
47/// Just a simple struct to hold the various bits needed to run a Copper application.
48#[cfg(feature = "std")]
49pub struct CopperContext {
50    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
51    pub logger_runtime: LoggerRuntime,
52    pub clock: RobotClock,
53}
54
55/// Manages the lifecycle of the copper lists and logging.
56pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
57    pub inner: CuListsManager<P, NBCL>,
58    /// Logger for the copper lists (messages between tasks)
59    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
60}
61
62impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
63    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
64        let mut is_top = true;
65        let mut nb_done = 0;
66        for cl in self.inner.iter_mut() {
67            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
68                cl.change_state(CopperListState::DoneProcessing);
69            }
70            if is_top && cl.get_state() == CopperListState::DoneProcessing {
71                if let Some(logger) = &mut self.logger {
72                    cl.change_state(CopperListState::BeingSerialized);
73                    logger.log(cl)?;
74                }
75                cl.change_state(CopperListState::Free);
76                nb_done += 1;
77            } else {
78                is_top = false;
79            }
80        }
81        for _ in 0..nb_done {
82            let _ = self.inner.pop();
83        }
84        Ok(())
85    }
86
87    pub fn available_copper_lists(&self) -> usize {
88        NBCL - self.inner.len()
89    }
90}
91
92/// Manages the frozen tasks state and logging.
93pub struct KeyFramesManager {
94    /// Where the serialized tasks are stored following the wave of execution of a CL.
95    inner: KeyFrame,
96
97    /// Logger for the state of the tasks (frozen tasks)
98    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
99
100    /// Capture a keyframe only each...
101    keyframe_interval: u32,
102}
103
104impl KeyFramesManager {
105    fn is_keyframe(&self, culistid: u32) -> bool {
106        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
107    }
108
109    pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
110        if self.is_keyframe(culistid) {
111            self.inner.reset(culistid, clock.now());
112        }
113    }
114
115    pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
116        if self.is_keyframe(culistid) {
117            if self.inner.culistid != culistid {
118                panic!("Freezing task for a different culistid");
119            }
120            self.inner
121                .add_frozen_task(task)
122                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
123        } else {
124            Ok(0)
125        }
126    }
127
128    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
129        if self.is_keyframe(culistid) {
130            let logger = self.logger.as_mut().unwrap();
131            logger.log(&self.inner)
132        } else {
133            Ok(())
134        }
135    }
136}
137
138/// This is the main structure that will be injected as a member of the Application struct.
139/// CT is the tuple of all the tasks in order of execution.
140/// CL is the type of the copper list, representing the input/output messages for all the tasks.
141pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
142    /// The base clock the runtime will be using to record time.
143    pub clock: RobotClock, // TODO: remove public at some point
144
145    /// The tuple of all the tasks in order of execution.
146    pub tasks: CT,
147
148    /// For backgrounded tasks.
149    #[cfg(feature = "std")]
150    pub threadpool: Arc<ThreadPool>,
151
152    /// The runtime monitoring.
153    pub monitor: M,
154
155    /// The logger for the copper lists (messages between tasks)
156    pub copperlists_manager: CopperListsManager<P, NBCL>,
157
158    /// The logger for the state of the tasks (frozen tasks)
159    pub keyframes_manager: KeyFramesManager,
160
161    /// The runtime configuration controlling the behavior of the run loop
162    pub runtime_config: RuntimeConfig,
163}
164
165/// To be able to share the clock we make the runtime a clock provider.
166impl<CT, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
167    ClockProvider for CuRuntime<CT, P, M, NBCL>
168{
169    fn get_clock(&self) -> RobotClock {
170        self.clock.clone()
171    }
172}
173
174/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
175/// It is a double encapsulation: this one recording the culistid and another even in
176/// bincode in the serialized_tasks.
177#[derive(Encode, Decode)]
178pub struct KeyFrame {
179    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
180    pub culistid: u32,
181    // This is the timestamp when the keyframe was created, using the robot clock.
182    pub timestamp: CuTime,
183    // This is the bincode representation of the tuple of all the tasks.
184    pub serialized_tasks: Vec<u8>,
185}
186
187impl KeyFrame {
188    fn new() -> Self {
189        KeyFrame {
190            culistid: 0,
191            timestamp: CuTime::default(),
192            serialized_tasks: Vec::new(),
193        }
194    }
195
196    /// This is to be able to avoid reallocations
197    fn reset(&mut self, culistid: u32, timestamp: CuTime) {
198        self.culistid = culistid;
199        self.timestamp = timestamp;
200        self.serialized_tasks.clear();
201    }
202
203    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
204    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
205        let cfg = bincode::config::standard();
206        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
207        BincodeAdapter(task).encode(&mut sizer)?;
208        let need = sizer.into_writer().bytes_written as usize;
209
210        let start = self.serialized_tasks.len();
211        self.serialized_tasks.resize(start + need, 0);
212        let mut enc =
213            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
214        BincodeAdapter(task).encode(&mut enc)?;
215        Ok(need)
216    }
217}
218
219impl<
220        CT,
221        P: CopperListTuple + CuListZeroedInit + Default + 'static,
222        M: CuMonitor,
223        const NBCL: usize,
224    > CuRuntime<CT, P, M, NBCL>
225{
226    // FIXME(gbin): this became REALLY ugly with no-std
227    #[cfg(feature = "std")]
228    pub fn new(
229        clock: RobotClock,
230        config: &CuConfig,
231        mission: Option<&str>,
232        tasks_instanciator: impl for<'c> Fn(
233            Vec<Option<&'c ComponentConfig>>,
234            Arc<ThreadPool>,
235        ) -> CuResult<CT>,
236        monitor_instanciator: impl Fn(&CuConfig) -> M,
237        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
238        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
239    ) -> CuResult<Self> {
240        let graph = config.get_graph(mission)?;
241        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
242            .get_all_nodes()
243            .iter()
244            .map(|(_, node)| node.get_instance_config())
245            .collect();
246
247        // TODO: make that configurable
248
249        let threadpool = Arc::new(
250            rayon::ThreadPoolBuilder::new()
251                .num_threads(2) // default to 4 threads if not specified
252                .build()
253                .expect("Could not create the threadpool"),
254        );
255
256        let tasks = tasks_instanciator(all_instances_configs, threadpool.clone())?;
257        let monitor = monitor_instanciator(config);
258
259        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
260            Some(logging_config) if logging_config.enable_task_logging => (
261                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
262                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
263                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
264            ),
265            Some(_) => (None, None, 0), // explicit no enable logging
266            None => (
267                // default
268                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
269                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
270                DEFAULT_KEYFRAME_INTERVAL,
271            ),
272        };
273
274        let copperlists_manager = CopperListsManager {
275            inner: CuListsManager::new(),
276            logger: copperlists_logger,
277        };
278
279        let keyframes_manager = KeyFramesManager {
280            inner: KeyFrame::new(),
281            logger: keyframes_logger,
282            keyframe_interval,
283        };
284
285        let runtime_config = config.runtime.clone().unwrap_or_default();
286
287        let runtime = Self {
288            tasks,
289            threadpool,
290            monitor,
291            clock,
292            copperlists_manager,
293            keyframes_manager,
294            runtime_config,
295        };
296
297        Ok(runtime)
298    }
299
300    #[cfg(not(feature = "std"))]
301    pub fn new(
302        clock: RobotClock,
303        config: &CuConfig,
304        mission: Option<&str>,
305        tasks_instanciator: impl for<'c> Fn(Vec<Option<&'c ComponentConfig>>) -> CuResult<CT>,
306        monitor_instanciator: impl Fn(&CuConfig) -> M,
307        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
308        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
309    ) -> CuResult<Self> {
310        let graph = config.get_graph(mission)?;
311        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
312            .get_all_nodes()
313            .iter()
314            .map(|(_, node)| node.get_instance_config())
315            .collect();
316
317        let tasks = tasks_instanciator(all_instances_configs)?;
318
319        let monitor = monitor_instanciator(config);
320
321        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
322            Some(logging_config) if logging_config.enable_task_logging => (
323                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
324                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
325                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
326            ),
327            Some(_) => (None, None, 0), // explicit no enable logging
328            None => (
329                // default
330                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
331                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
332                DEFAULT_KEYFRAME_INTERVAL,
333            ),
334        };
335
336        let copperlists_manager = CopperListsManager {
337            inner: CuListsManager::new(),
338            logger: copperlists_logger,
339        };
340
341        let keyframes_manager = KeyFramesManager {
342            inner: KeyFrame::new(),
343            logger: keyframes_logger,
344            keyframe_interval,
345        };
346
347        let runtime_config = config.runtime.clone().unwrap_or_default();
348
349        let runtime = Self {
350            tasks,
351            #[cfg(feature = "std")]
352            threadpool,
353            monitor,
354            clock,
355            copperlists_manager,
356            keyframes_manager,
357            runtime_config,
358        };
359
360        Ok(runtime)
361    }
362}
363
364/// Copper tasks can be of 3 types:
365/// - Source: only producing output messages (usually used for drivers)
366/// - Regular: processing input messages and producing output messages, more like compute nodes.
367/// - Sink: only consuming input messages (usually used for actuators)
368#[derive(Debug, PartialEq, Eq, Clone, Copy)]
369pub enum CuTaskType {
370    Source,
371    Regular,
372    Sink,
373}
374
375/// This structure represents a step in the execution plan.
376pub struct CuExecutionStep {
377    /// NodeId: node id of the task to execute
378    pub node_id: NodeId,
379    /// Node: node instance
380    pub node: Node,
381    /// CuTaskType: type of the task
382    pub task_type: CuTaskType,
383
384    /// the indices in the copper list of the input messages and their types
385    pub input_msg_indices_types: Vec<(u32, String)>,
386
387    /// the index in the copper list of the output message and its type
388    pub output_msg_index_type: Option<(u32, String)>,
389}
390
391impl Debug for CuExecutionStep {
392    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
393        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
394        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
395        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
396        f.write_str(
397            format!(
398                "              input_msg_types: {:?}\n",
399                self.input_msg_indices_types
400            )
401            .as_str(),
402        )?;
403        f.write_str(
404            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
405        )?;
406        Ok(())
407    }
408}
409
410/// This structure represents a loop in the execution plan.
411/// It is used to represent a sequence of Execution units (loop or steps) that are executed
412/// multiple times.
413/// if loop_count is None, the loop is infinite.
414pub struct CuExecutionLoop {
415    pub steps: Vec<CuExecutionUnit>,
416    pub loop_count: Option<u32>,
417}
418
419impl Debug for CuExecutionLoop {
420    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
421        f.write_str("CuExecutionLoop:\n")?;
422        for step in &self.steps {
423            match step {
424                CuExecutionUnit::Step(step) => {
425                    step.fmt(f)?;
426                }
427                CuExecutionUnit::Loop(l) => {
428                    l.fmt(f)?;
429                }
430            }
431        }
432
433        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
434        Ok(())
435    }
436}
437
438/// This structure represents a step in the execution plan.
439#[derive(Debug)]
440pub enum CuExecutionUnit {
441    Step(CuExecutionStep),
442    Loop(CuExecutionLoop),
443}
444
445fn find_output_index_type_from_nodeid(
446    node_id: NodeId,
447    steps: &Vec<CuExecutionUnit>,
448) -> Option<(u32, String)> {
449    for step in steps {
450        match step {
451            CuExecutionUnit::Loop(loop_unit) => {
452                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
453                    return Some(index);
454                }
455            }
456            CuExecutionUnit::Step(step) => {
457                if step.node_id == node_id {
458                    return step.output_msg_index_type.clone();
459                }
460            }
461        }
462    }
463    None
464}
465
466pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
467    if graph.incoming_neighbor_count(node_id) == 0 {
468        CuTaskType::Source
469    } else if graph.outgoing_neighbor_count(node_id) == 0 {
470        CuTaskType::Sink
471    } else {
472        CuTaskType::Regular
473    }
474}
475
476/// This function gets the input node by using the input step plan id, to get the edge that
477/// connects the input to the output in the config graph
478fn find_edge_with_plan_input_id(
479    plan: &[CuExecutionUnit],
480    graph: &CuGraph,
481    plan_id: u32,
482    output_node_id: NodeId,
483) -> usize {
484    let input_node = plan
485        .get(plan_id as usize)
486        .expect("Input step should've been added to plan before the step that receives the input");
487    let CuExecutionUnit::Step(input_step) = input_node else {
488        panic!("Expected input to be from a step, not a loop");
489    };
490    let input_node_id = input_step.node_id;
491
492    graph
493        .0
494        .edges_connecting(input_node_id.into(), output_node_id.into())
495        .map(|edge| edge.id().index())
496        .next()
497        .expect("An edge connecting the input to the output should exist")
498}
499
500/// The connection id used here is the index of the config graph edge that equates to the wanted
501/// connection
502fn sort_inputs_by_cnx_id(
503    input_msg_indices_types: &mut [(u32, String)],
504    plan: &[CuExecutionUnit],
505    graph: &CuGraph,
506    curr_node_id: NodeId,
507) {
508    input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
509        let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
510        let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
511        a_edge_id.cmp(&b_edge_id)
512    });
513}
514/// Explores a subbranch and build the partial plan out of it.
515fn plan_tasks_tree_branch(
516    graph: &CuGraph,
517    mut next_culist_output_index: u32,
518    starting_point: NodeId,
519    plan: &mut Vec<CuExecutionUnit>,
520) -> (u32, bool) {
521    #[cfg(all(feature = "std", feature = "macro_debug"))]
522    eprintln!("-- starting branch from node {starting_point}");
523
524    let mut visitor = Bfs::new(&graph.0, starting_point.into());
525    let mut handled = false;
526
527    while let Some(node) = visitor.next(&graph.0) {
528        let id = node.index() as NodeId;
529        let node_ref = graph.get_node(id).unwrap();
530        #[cfg(all(feature = "std", feature = "macro_debug"))]
531        eprintln!("  Visiting node: {node_ref:?}");
532
533        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
534        let output_msg_index_type: Option<(u32, String)>;
535        let task_type = find_task_type_for_id(graph, id);
536
537        match task_type {
538            CuTaskType::Source => {
539                #[cfg(all(feature = "std", feature = "macro_debug"))]
540                eprintln!("    → Source node, assign output index {next_culist_output_index}");
541                output_msg_index_type = Some((
542                    next_culist_output_index,
543                    graph
544                        .0
545                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
546                        .unwrap() // FIXME(gbin): Error handling
547                        .msg
548                        .clone(),
549                ));
550                next_culist_output_index += 1;
551            }
552            CuTaskType::Sink => {
553                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
554                #[cfg(all(feature = "std", feature = "macro_debug"))]
555                eprintln!("    → Sink with parents: {parents:?}");
556                for parent in parents {
557                    let pid = parent;
558                    let index_type = find_output_index_type_from_nodeid(pid, plan);
559                    if let Some(index_type) = index_type {
560                        #[cfg(all(feature = "std", feature = "macro_debug"))]
561                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
562                        input_msg_indices_types.push(index_type);
563                    } else {
564                        #[cfg(all(feature = "std", feature = "macro_debug"))]
565                        eprintln!("      ✗ Input from {pid} not ready, returning");
566                        return (next_culist_output_index, handled);
567                    }
568                }
569                output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
570                next_culist_output_index += 1;
571            }
572            CuTaskType::Regular => {
573                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
574                #[cfg(all(feature = "std", feature = "macro_debug"))]
575                eprintln!("    → Regular task with parents: {parents:?}");
576                for parent in parents {
577                    let pid = parent;
578                    let index_type = find_output_index_type_from_nodeid(pid, plan);
579                    if let Some(index_type) = index_type {
580                        #[cfg(all(feature = "std", feature = "macro_debug"))]
581                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
582                        input_msg_indices_types.push(index_type);
583                    } else {
584                        #[cfg(all(feature = "std", feature = "macro_debug"))]
585                        eprintln!("      ✗ Input from {pid} not ready, returning");
586                        return (next_culist_output_index, handled);
587                    }
588                }
589                output_msg_index_type = Some((
590                    next_culist_output_index,
591                    graph
592                        .0
593                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) // FIXME(gbin): Error handling and multimission
594                        .unwrap()
595                        .msg
596                        .clone(),
597                ));
598                next_culist_output_index += 1;
599            }
600        }
601
602        sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
603
604        if let Some(pos) = plan
605            .iter()
606            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
607        {
608            #[cfg(all(feature = "std", feature = "macro_debug"))]
609            eprintln!("    → Already in plan, modifying existing step");
610            let mut step = plan.remove(pos);
611            if let CuExecutionUnit::Step(ref mut s) = step {
612                s.input_msg_indices_types = input_msg_indices_types;
613            }
614            plan.push(step);
615        } else {
616            #[cfg(all(feature = "std", feature = "macro_debug"))]
617            eprintln!("    → New step added to plan");
618            let step = CuExecutionStep {
619                node_id: id,
620                node: node_ref.clone(),
621                task_type,
622                input_msg_indices_types,
623                output_msg_index_type,
624            };
625            plan.push(CuExecutionUnit::Step(step));
626        }
627
628        handled = true;
629    }
630
631    #[cfg(all(feature = "std", feature = "macro_debug"))]
632    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
633    (next_culist_output_index, handled)
634}
635
636/// This is the main heuristics to compute an execution plan at compilation time.
637/// TODO(gbin): Make that heuristic pluggable.
638pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
639    #[cfg(all(feature = "std", feature = "macro_debug"))]
640    eprintln!("[runtime plan]");
641    let visited = graph.0.visit_map();
642    let mut plan = Vec::new();
643    let mut next_culist_output_index = 0u32;
644
645    let mut queue: VecDeque<NodeId> = graph
646        .node_indices()
647        .iter()
648        .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
649        .map(|node| node.index() as NodeId)
650        .collect();
651
652    #[cfg(all(feature = "std", feature = "macro_debug"))]
653    eprintln!("Initial source nodes: {queue:?}");
654
655    while let Some(start_node) = queue.pop_front() {
656        if visited.is_visited(&start_node) {
657            #[cfg(all(feature = "std", feature = "macro_debug"))]
658            eprintln!("→ Skipping already visited source {start_node}");
659            continue;
660        }
661
662        #[cfg(all(feature = "std", feature = "macro_debug"))]
663        eprintln!("→ Starting BFS from source {start_node}");
664        let mut bfs = Bfs::new(&graph.0, start_node.into());
665
666        while let Some(node_index) = bfs.next(&graph.0) {
667            let node_id = node_index.index() as NodeId;
668            let already_in_plan = plan
669                .iter()
670                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
671            if already_in_plan {
672                #[cfg(all(feature = "std", feature = "macro_debug"))]
673                eprintln!("    → Node {node_id} already planned, skipping");
674                continue;
675            }
676
677            #[cfg(all(feature = "std", feature = "macro_debug"))]
678            eprintln!("    Planning from node {node_id}");
679            let (new_index, handled) =
680                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
681            next_culist_output_index = new_index;
682
683            if !handled {
684                #[cfg(all(feature = "std", feature = "macro_debug"))]
685                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
686                continue;
687            }
688
689            #[cfg(all(feature = "std", feature = "macro_debug"))]
690            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
691            for neighbor in graph.0.neighbors(node_index) {
692                if !visited.is_visited(&neighbor) {
693                    let nid = neighbor.index() as NodeId;
694                    #[cfg(all(feature = "std", feature = "macro_debug"))]
695                    eprintln!("      → Enqueueing neighbor {nid}");
696                    queue.push_back(nid);
697                }
698            }
699        }
700    }
701
702    Ok(CuExecutionLoop {
703        steps: plan,
704        loop_count: None,
705    })
706}
707
708//tests
709#[cfg(test)]
710mod tests {
711    use super::*;
712    use crate::config::Node;
713    use crate::cutask::CuSinkTask;
714    use crate::cutask::{CuSrcTask, Freezable};
715    use crate::monitoring::NoMonitor;
716    use bincode::Encode;
717    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
718    use serde_derive::Serialize;
719
720    pub struct TestSource {}
721
722    impl Freezable for TestSource {}
723
724    impl CuSrcTask for TestSource {
725        type Output<'m> = ();
726        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
727        where
728            Self: Sized,
729        {
730            Ok(Self {})
731        }
732
733        fn process(
734            &mut self,
735            _clock: &RobotClock,
736            _empty_msg: &mut Self::Output<'_>,
737        ) -> CuResult<()> {
738            Ok(())
739        }
740    }
741
742    pub struct TestSink {}
743
744    impl Freezable for TestSink {}
745
746    impl CuSinkTask for TestSink {
747        type Input<'m> = ();
748
749        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
750        where
751            Self: Sized,
752        {
753            Ok(Self {})
754        }
755
756        fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
757            Ok(())
758        }
759    }
760
761    // Those should be generated by the derive macro
762    type Tasks = (TestSource, TestSink);
763
764    #[derive(Debug, Encode, Decode, Serialize, Default)]
765    struct Msgs(());
766
767    impl ErasedCuStampedDataSet for Msgs {
768        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
769            Vec::new()
770        }
771    }
772
773    impl MatchingTasks for Msgs {
774        fn get_all_task_ids() -> &'static [&'static str] {
775            &[]
776        }
777    }
778
779    impl CuListZeroedInit for Msgs {
780        fn init_zeroed(&mut self) {}
781    }
782
783    #[cfg(feature = "std")]
784    fn tasks_instanciator(
785        all_instances_configs: Vec<Option<&ComponentConfig>>,
786        _threadpool: Arc<ThreadPool>,
787    ) -> CuResult<Tasks> {
788        Ok((
789            TestSource::new(all_instances_configs[0])?,
790            TestSink::new(all_instances_configs[1])?,
791        ))
792    }
793
794    #[cfg(not(feature = "std"))]
795    fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
796        Ok((
797            TestSource::new(all_instances_configs[0])?,
798            TestSink::new(all_instances_configs[1])?,
799        ))
800    }
801
802    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
803        NoMonitor {}
804    }
805
806    #[derive(Debug)]
807    struct FakeWriter {}
808
809    impl<E: Encode> WriteStream<E> for FakeWriter {
810        fn log(&mut self, _obj: &E) -> CuResult<()> {
811            Ok(())
812        }
813    }
814
815    #[test]
816    fn test_runtime_instantiation() {
817        let mut config = CuConfig::default();
818        let graph = config.get_graph_mut(None).unwrap();
819        graph.add_node(Node::new("a", "TestSource")).unwrap();
820        graph.add_node(Node::new("b", "TestSink")).unwrap();
821        graph.connect(0, 1, "()").unwrap();
822        let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
823            RobotClock::default(),
824            &config,
825            None,
826            tasks_instanciator,
827            monitor_instanciator,
828            FakeWriter {},
829            FakeWriter {},
830        );
831        assert!(runtime.is_ok());
832    }
833
834    #[test]
835    fn test_copperlists_manager_lifecycle() {
836        let mut config = CuConfig::default();
837        let graph = config.get_graph_mut(None).unwrap();
838        graph.add_node(Node::new("a", "TestSource")).unwrap();
839        graph.add_node(Node::new("b", "TestSink")).unwrap();
840        graph.connect(0, 1, "()").unwrap();
841
842        let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
843            RobotClock::default(),
844            &config,
845            None,
846            tasks_instanciator,
847            monitor_instanciator,
848            FakeWriter {},
849            FakeWriter {},
850        )
851        .unwrap();
852
853        // Now emulates the generated runtime
854        {
855            let copperlists = &mut runtime.copperlists_manager;
856            let culist0 = copperlists
857                .inner
858                .create()
859                .expect("Ran out of space for copper lists");
860            // FIXME: error handling.
861            let id = culist0.id;
862            assert_eq!(id, 0);
863            culist0.change_state(CopperListState::Processing);
864            assert_eq!(copperlists.available_copper_lists(), 1);
865        }
866
867        {
868            let copperlists = &mut runtime.copperlists_manager;
869            let culist1 = copperlists
870                .inner
871                .create()
872                .expect("Ran out of space for copper lists"); // FIXME: error handling.
873            let id = culist1.id;
874            assert_eq!(id, 1);
875            culist1.change_state(CopperListState::Processing);
876            assert_eq!(copperlists.available_copper_lists(), 0);
877        }
878
879        {
880            let copperlists = &mut runtime.copperlists_manager;
881            let culist2 = copperlists.inner.create();
882            assert!(culist2.is_none());
883            assert_eq!(copperlists.available_copper_lists(), 0);
884            // Free in order, should let the top of the stack be serialized and freed.
885            let _ = copperlists.end_of_processing(1);
886            assert_eq!(copperlists.available_copper_lists(), 1);
887        }
888
889        // Readd a CL
890        {
891            let copperlists = &mut runtime.copperlists_manager;
892            let culist2 = copperlists
893                .inner
894                .create()
895                .expect("Ran out of space for copper lists"); // FIXME: error handling.
896            let id = culist2.id;
897            assert_eq!(id, 2);
898            culist2.change_state(CopperListState::Processing);
899            assert_eq!(copperlists.available_copper_lists(), 0);
900            // Free out of order, the #0 first
901            let _ = copperlists.end_of_processing(0);
902            // Should not free up the top of the stack
903            assert_eq!(copperlists.available_copper_lists(), 0);
904
905            // Free up the top of the stack
906            let _ = copperlists.end_of_processing(2);
907            // This should free up 2 CLs
908
909            assert_eq!(copperlists.available_copper_lists(), 2);
910        }
911    }
912
913    #[test]
914    fn test_runtime_task_input_order() {
915        let mut config = CuConfig::default();
916        let graph = config.get_graph_mut(None).unwrap();
917        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
918        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
919        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
920
921        assert_eq!(src1_id, 0);
922        assert_eq!(src2_id, 1);
923
924        // note that the source2 connection is before the source1
925        let src1_type = "src1_type";
926        let src2_type = "src2_type";
927        graph.connect(src2_id, sink_id, src2_type).unwrap();
928        graph.connect(src1_id, sink_id, src1_type).unwrap();
929
930        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
931        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
932        // the edge id depends on the order the connection is created, not
933        // on the node id, and that is what determines the input order
934        assert_eq!(src1_edge_id, 1);
935        assert_eq!(src2_edge_id, 0);
936
937        let runtime = compute_runtime_plan(graph).unwrap();
938        let sink_step = runtime
939            .steps
940            .iter()
941            .find_map(|step| match step {
942                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
943                _ => None,
944            })
945            .unwrap();
946
947        // since the src2 connection was added before src1 connection, the src2 type should be
948        // first
949        assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
950        assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
951    }
952
953    #[test]
954    fn test_runtime_plan_diamond_case1() {
955        // more complex topology that tripped the scheduler
956        let mut config = CuConfig::default();
957        let graph = config.get_graph_mut(None).unwrap();
958        let cam0_id = graph
959            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
960            .unwrap();
961        let inf0_id = graph
962            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
963            .unwrap();
964        let broadcast_id = graph
965            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
966            .unwrap();
967
968        // case 1 order
969        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
970        graph.connect(cam0_id, inf0_id, "i32").unwrap();
971        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
972
973        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
974        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
975
976        assert_eq!(edge_cam0_to_inf0, 0);
977        assert_eq!(edge_cam0_to_broadcast, 1);
978
979        let runtime = compute_runtime_plan(graph).unwrap();
980        let broadcast_step = runtime
981            .steps
982            .iter()
983            .find_map(|step| match step {
984                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
985                _ => None,
986            })
987            .unwrap();
988
989        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
990        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
991    }
992
993    #[test]
994    fn test_runtime_plan_diamond_case2() {
995        // more complex topology that tripped the scheduler variation 2
996        let mut config = CuConfig::default();
997        let graph = config.get_graph_mut(None).unwrap();
998        let cam0_id = graph
999            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1000            .unwrap();
1001        let inf0_id = graph
1002            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1003            .unwrap();
1004        let broadcast_id = graph
1005            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1006            .unwrap();
1007
1008        // case 2 order
1009        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1010        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1011        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1012
1013        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1014        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1015
1016        assert_eq!(edge_cam0_to_broadcast, 0);
1017        assert_eq!(edge_cam0_to_inf0, 1);
1018
1019        let runtime = compute_runtime_plan(graph).unwrap();
1020        let broadcast_step = runtime
1021            .steps
1022            .iter()
1023            .find_map(|step| match step {
1024                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1025                _ => None,
1026            })
1027            .unwrap();
1028
1029        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1030        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1031    }
1032}