cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::CuMonitor;
10use cu29_clock::{ClockProvider, CuTime, RobotClock};
11use cu29_traits::CuResult;
12use cu29_traits::WriteStream;
13use cu29_traits::{CopperListTuple, CuError};
14
15use bincode::enc::write::{SizeWriter, SliceWriter};
16use bincode::enc::EncoderImpl;
17use bincode::error::EncodeError;
18use bincode::{Decode, Encode};
19use core::fmt::{Debug, Formatter};
20use petgraph::prelude::*;
21use petgraph::visit::VisitMap;
22use petgraph::visit::Visitable;
23
24#[cfg(not(feature = "std"))]
25mod imp {
26    pub use alloc::boxed::Box;
27    pub use alloc::collections::VecDeque;
28    pub use alloc::format;
29    pub use alloc::string::String;
30    pub use alloc::string::ToString;
31    pub use alloc::vec::Vec;
32    pub use core::fmt::Result as FmtResult;
33}
34
35#[cfg(feature = "std")]
36mod imp {
37    pub use cu29_log_runtime::LoggerRuntime;
38    pub use cu29_unifiedlog::UnifiedLoggerWrite;
39    pub use rayon::ThreadPool;
40    pub use std::collections::VecDeque;
41    pub use std::fmt::Result as FmtResult;
42    pub use std::sync::{Arc, Mutex};
43}
44
45use imp::*;
46
47/// Just a simple struct to hold the various bits needed to run a Copper application.
48#[cfg(feature = "std")]
49pub struct CopperContext {
50    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
51    pub logger_runtime: LoggerRuntime,
52    pub clock: RobotClock,
53}
54
55/// Manages the lifecycle of the copper lists and logging.
56pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
57    pub inner: CuListsManager<P, NBCL>,
58    /// Logger for the copper lists (messages between tasks)
59    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
60}
61
62impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
63    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
64        let mut is_top = true;
65        let mut nb_done = 0;
66        for cl in self.inner.iter_mut() {
67            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
68                cl.change_state(CopperListState::DoneProcessing);
69            }
70            if is_top && cl.get_state() == CopperListState::DoneProcessing {
71                if let Some(logger) = &mut self.logger {
72                    cl.change_state(CopperListState::BeingSerialized);
73                    logger.log(cl)?;
74                }
75                cl.change_state(CopperListState::Free);
76                nb_done += 1;
77            } else {
78                is_top = false;
79            }
80        }
81        for _ in 0..nb_done {
82            let _ = self.inner.pop();
83        }
84        Ok(())
85    }
86
87    pub fn available_copper_lists(&self) -> usize {
88        NBCL - self.inner.len()
89    }
90}
91
92/// Manages the frozen tasks state and logging.
93pub struct KeyFramesManager {
94    /// Where the serialized tasks are stored following the wave of execution of a CL.
95    inner: KeyFrame,
96
97    /// Logger for the state of the tasks (frozen tasks)
98    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
99
100    /// Capture a keyframe only each...
101    keyframe_interval: u32,
102}
103
104impl KeyFramesManager {
105    fn is_keyframe(&self, culistid: u32) -> bool {
106        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
107    }
108
109    pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
110        if self.is_keyframe(culistid) {
111            self.inner.reset(culistid, clock.now());
112        }
113    }
114
115    pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
116        if self.is_keyframe(culistid) {
117            if self.inner.culistid != culistid {
118                panic!("Freezing task for a different culistid");
119            }
120            self.inner
121                .add_frozen_task(task)
122                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
123        } else {
124            Ok(0)
125        }
126    }
127
128    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
129        if self.is_keyframe(culistid) {
130            let logger = self.logger.as_mut().unwrap();
131            logger.log(&self.inner)
132        } else {
133            Ok(())
134        }
135    }
136}
137
138/// This is the main structure that will be injected as a member of the Application struct.
139/// CT is the tuple of all the tasks in order of execution.
140/// CL is the type of the copper list, representing the input/output messages for all the tasks.
141pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
142    /// The base clock the runtime will be using to record time.
143    pub clock: RobotClock, // TODO: remove public at some point
144
145    /// The tuple of all the tasks in order of execution.
146    pub tasks: CT,
147
148    /// For backgrounded tasks.
149    #[cfg(feature = "std")]
150    pub threadpool: Arc<ThreadPool>,
151
152    /// The runtime monitoring.
153    pub monitor: M,
154
155    /// The logger for the copper lists (messages between tasks)
156    pub copperlists_manager: CopperListsManager<P, NBCL>,
157
158    /// The logger for the state of the tasks (frozen tasks)
159    pub keyframes_manager: KeyFramesManager,
160
161    /// The runtime configuration controlling the behavior of the run loop
162    pub runtime_config: RuntimeConfig,
163}
164
165/// To be able to share the clock we make the runtime a clock provider.
166impl<CT, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
167    ClockProvider for CuRuntime<CT, P, M, NBCL>
168{
169    fn get_clock(&self) -> RobotClock {
170        self.clock.clone()
171    }
172}
173
174/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
175/// It is a double encapsulation: this one recording the culistid and another even in
176/// bincode in the serialized_tasks.
177#[derive(Encode, Decode)]
178pub struct KeyFrame {
179    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
180    pub culistid: u32,
181    // This is the timestamp when the keyframe was created, using the robot clock.
182    pub timestamp: CuTime,
183    // This is the bincode representation of the tuple of all the tasks.
184    pub serialized_tasks: Vec<u8>,
185}
186
187impl KeyFrame {
188    fn new() -> Self {
189        KeyFrame {
190            culistid: 0,
191            timestamp: CuTime::default(),
192            serialized_tasks: Vec::new(),
193        }
194    }
195
196    /// This is to be able to avoid reallocations
197    fn reset(&mut self, culistid: u32, timestamp: CuTime) {
198        self.culistid = culistid;
199        self.timestamp = timestamp;
200        self.serialized_tasks.clear();
201    }
202
203    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
204    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
205        let cfg = bincode::config::standard();
206        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
207        BincodeAdapter(task).encode(&mut sizer)?;
208        let need = sizer.into_writer().bytes_written as usize;
209
210        let start = self.serialized_tasks.len();
211        self.serialized_tasks.resize(start + need, 0);
212        let mut enc =
213            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
214        BincodeAdapter(task).encode(&mut enc)?;
215        Ok(need)
216    }
217}
218
219impl<
220        CT,
221        P: CopperListTuple + CuListZeroedInit + Default + 'static,
222        M: CuMonitor,
223        const NBCL: usize,
224    > CuRuntime<CT, P, M, NBCL>
225{
226    // FIXME(gbin): this became REALLY ugly with no-std
227    #[cfg(feature = "std")]
228    pub fn new(
229        clock: RobotClock,
230        config: &CuConfig,
231        mission: Option<&str>,
232        tasks_instanciator: impl for<'c> Fn(
233            Vec<Option<&'c ComponentConfig>>,
234            Arc<ThreadPool>,
235        ) -> CuResult<CT>,
236        monitor_instanciator: impl Fn(&CuConfig) -> M,
237        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
238        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
239    ) -> CuResult<Self> {
240        let graph = config.get_graph(mission)?;
241        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
242            .get_all_nodes()
243            .iter()
244            .map(|(_, node)| node.get_instance_config())
245            .collect();
246
247        // TODO: make that configurable
248
249        let threadpool = Arc::new(
250            rayon::ThreadPoolBuilder::new()
251                .num_threads(2) // default to 4 threads if not specified
252                .build()
253                .expect("Could not create the threadpool"),
254        );
255
256        let tasks = tasks_instanciator(all_instances_configs, threadpool.clone())?;
257        let monitor = monitor_instanciator(config);
258
259        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
260            Some(logging_config) if logging_config.enable_task_logging => (
261                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
262                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
263                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
264            ),
265            Some(_) => (None, None, 0), // explicit no enable logging
266            None => (
267                // default
268                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
269                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
270                DEFAULT_KEYFRAME_INTERVAL,
271            ),
272        };
273
274        let copperlists_manager = CopperListsManager {
275            inner: CuListsManager::new(),
276            logger: copperlists_logger,
277        };
278
279        let keyframes_manager = KeyFramesManager {
280            inner: KeyFrame::new(),
281            logger: keyframes_logger,
282            keyframe_interval,
283        };
284
285        let runtime_config = config.runtime.clone().unwrap_or_default();
286
287        let runtime = Self {
288            tasks,
289            threadpool,
290            monitor,
291            clock,
292            copperlists_manager,
293            keyframes_manager,
294            runtime_config,
295        };
296
297        Ok(runtime)
298    }
299
300    #[cfg(not(feature = "std"))]
301    pub fn new(
302        clock: RobotClock,
303        config: &CuConfig,
304        mission: Option<&str>,
305        tasks_instanciator: impl for<'c> Fn(Vec<Option<&'c ComponentConfig>>) -> CuResult<CT>,
306        monitor_instanciator: impl Fn(&CuConfig) -> M,
307        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
308        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
309    ) -> CuResult<Self> {
310        let graph = config.get_graph(mission)?;
311        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
312            .get_all_nodes()
313            .iter()
314            .map(|(_, node)| node.get_instance_config())
315            .collect();
316
317        let tasks = tasks_instanciator(all_instances_configs)?;
318
319        let monitor = monitor_instanciator(config);
320
321        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
322            Some(logging_config) if logging_config.enable_task_logging => (
323                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
324                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
325                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
326            ),
327            Some(_) => (None, None, 0), // explicit no enable logging
328            None => (
329                // default
330                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
331                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
332                DEFAULT_KEYFRAME_INTERVAL,
333            ),
334        };
335
336        let copperlists_manager = CopperListsManager {
337            inner: CuListsManager::new(),
338            logger: copperlists_logger,
339        };
340
341        let keyframes_manager = KeyFramesManager {
342            inner: KeyFrame::new(),
343            logger: keyframes_logger,
344            keyframe_interval,
345        };
346
347        let runtime_config = config.runtime.clone().unwrap_or_default();
348
349        let runtime = Self {
350            tasks,
351            #[cfg(feature = "std")]
352            threadpool,
353            monitor,
354            clock,
355            copperlists_manager,
356            keyframes_manager,
357            runtime_config,
358        };
359
360        Ok(runtime)
361    }
362}
363
364/// Copper tasks can be of 3 types:
365/// - Source: only producing output messages (usually used for drivers)
366/// - Regular: processing input messages and producing output messages, more like compute nodes.
367/// - Sink: only consuming input messages (usually used for actuators)
368#[derive(Debug, PartialEq, Eq, Clone, Copy)]
369pub enum CuTaskType {
370    Source,
371    Regular,
372    Sink,
373}
374
375/// This structure represents a step in the execution plan.
376pub struct CuExecutionStep {
377    /// NodeId: node id of the task to execute
378    pub node_id: NodeId,
379    /// Node: node instance
380    pub node: Node,
381    /// CuTaskType: type of the task
382    pub task_type: CuTaskType,
383
384    /// the indices in the copper list of the input messages and their types
385    pub input_msg_indices_types: Vec<(u32, String)>,
386
387    /// the index in the copper list of the output message and its type
388    pub output_msg_index_type: Option<(u32, String)>,
389}
390
391impl Debug for CuExecutionStep {
392    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
393        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
394        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
395        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
396        f.write_str(
397            format!(
398                "              input_msg_types: {:?}\n",
399                self.input_msg_indices_types
400            )
401            .as_str(),
402        )?;
403        f.write_str(
404            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
405        )?;
406        Ok(())
407    }
408}
409
410/// This structure represents a loop in the execution plan.
411/// It is used to represent a sequence of Execution units (loop or steps) that are executed
412/// multiple times.
413/// if loop_count is None, the loop is infinite.
414pub struct CuExecutionLoop {
415    pub steps: Vec<CuExecutionUnit>,
416    pub loop_count: Option<u32>,
417}
418
419impl Debug for CuExecutionLoop {
420    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
421        f.write_str("CuExecutionLoop:\n")?;
422        for step in &self.steps {
423            match step {
424                CuExecutionUnit::Step(step) => {
425                    step.fmt(f)?;
426                }
427                CuExecutionUnit::Loop(l) => {
428                    l.fmt(f)?;
429                }
430            }
431        }
432
433        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
434        Ok(())
435    }
436}
437
438/// This structure represents a step in the execution plan.
439#[derive(Debug)]
440pub enum CuExecutionUnit {
441    Step(CuExecutionStep),
442    Loop(CuExecutionLoop),
443}
444
445fn find_output_index_type_from_nodeid(
446    node_id: NodeId,
447    steps: &Vec<CuExecutionUnit>,
448) -> Option<(u32, String)> {
449    for step in steps {
450        match step {
451            CuExecutionUnit::Loop(loop_unit) => {
452                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
453                    return Some(index);
454                }
455            }
456            CuExecutionUnit::Step(step) => {
457                if step.node_id == node_id {
458                    return step.output_msg_index_type.clone();
459                }
460            }
461        }
462    }
463    None
464}
465
466pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
467    if graph.0.neighbors_directed(node_id.into(), Incoming).count() == 0 {
468        CuTaskType::Source
469    } else if graph.0.neighbors_directed(node_id.into(), Outgoing).count() == 0 {
470        CuTaskType::Sink
471    } else {
472        CuTaskType::Regular
473    }
474}
475
476/// This function gets the input node by using the input step plan id, to get the edge that
477/// connects the input to the output in the config graph
478fn find_edge_with_plan_input_id(
479    plan: &[CuExecutionUnit],
480    graph: &CuGraph,
481    plan_id: u32,
482    output_node_id: NodeId,
483) -> usize {
484    let input_node = plan
485        .get(plan_id as usize)
486        .expect("Input step should've been added to plan before the step that receives the input");
487    let CuExecutionUnit::Step(input_step) = input_node else {
488        panic!("Expected input to be from a step, not a loop");
489    };
490    let input_node_id = input_step.node_id;
491
492    graph
493        .0
494        .edges_connecting(input_node_id.into(), output_node_id.into())
495        .map(|edge| edge.id().index())
496        .next()
497        .expect("An edge connecting the input to the output should exist")
498}
499
500/// The connection id used here is the index of the config graph edge that equates to the wanted
501/// connection
502fn sort_inputs_by_cnx_id(
503    input_msg_indices_types: &mut [(u32, String)],
504    plan: &[CuExecutionUnit],
505    graph: &CuGraph,
506    curr_node_id: NodeId,
507) {
508    input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
509        let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
510        let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
511        a_edge_id.cmp(&b_edge_id)
512    });
513}
514/// Explores a subbranch and build the partial plan out of it.
515fn plan_tasks_tree_branch(
516    graph: &CuGraph,
517    mut next_culist_output_index: u32,
518    starting_point: NodeId,
519    plan: &mut Vec<CuExecutionUnit>,
520) -> (u32, bool) {
521    #[cfg(all(feature = "std", feature = "macro_debug"))]
522    eprintln!("-- starting branch from node {starting_point}");
523
524    let mut visitor = Bfs::new(&graph.0, starting_point.into());
525    let mut handled = false;
526
527    while let Some(node) = visitor.next(&graph.0) {
528        let id = node.index() as NodeId;
529        let node_ref = graph.get_node(id).unwrap();
530        #[cfg(all(feature = "std", feature = "macro_debug"))]
531        eprintln!("  Visiting node: {node_ref:?}");
532
533        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
534        let output_msg_index_type: Option<(u32, String)>;
535        let task_type = find_task_type_for_id(graph, id);
536
537        match task_type {
538            CuTaskType::Source => {
539                #[cfg(all(feature = "std", feature = "macro_debug"))]
540                eprintln!("    → Source node, assign output index {next_culist_output_index}");
541                output_msg_index_type = Some((
542                    next_culist_output_index,
543                    graph
544                        .0
545                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
546                        .unwrap() // FIXME(gbin): Error handling
547                        .msg
548                        .clone(),
549                ));
550                next_culist_output_index += 1;
551            }
552            CuTaskType::Sink => {
553                let parents: Vec<NodeIndex> =
554                    graph.0.neighbors_directed(id.into(), Incoming).collect();
555                #[cfg(all(feature = "std", feature = "macro_debug"))]
556                eprintln!("    → Sink with parents: {parents:?}");
557                for parent in &parents {
558                    let pid = parent.index() as NodeId;
559                    let index_type = find_output_index_type_from_nodeid(pid, plan);
560                    if let Some(index_type) = index_type {
561                        #[cfg(all(feature = "std", feature = "macro_debug"))]
562                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
563                        input_msg_indices_types.push(index_type);
564                    } else {
565                        #[cfg(all(feature = "std", feature = "macro_debug"))]
566                        eprintln!("      ✗ Input from {pid} not ready, returning");
567                        return (next_culist_output_index, handled);
568                    }
569                }
570                output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
571                next_culist_output_index += 1;
572            }
573            CuTaskType::Regular => {
574                let parents: Vec<NodeIndex> =
575                    graph.0.neighbors_directed(id.into(), Incoming).collect();
576                #[cfg(all(feature = "std", feature = "macro_debug"))]
577                eprintln!("    → Regular task with parents: {parents:?}");
578                for parent in &parents {
579                    let pid = parent.index() as NodeId;
580                    let index_type = find_output_index_type_from_nodeid(pid, plan);
581                    if let Some(index_type) = index_type {
582                        #[cfg(all(feature = "std", feature = "macro_debug"))]
583                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
584                        input_msg_indices_types.push(index_type);
585                    } else {
586                        #[cfg(all(feature = "std", feature = "macro_debug"))]
587                        eprintln!("      ✗ Input from {pid} not ready, returning");
588                        return (next_culist_output_index, handled);
589                    }
590                }
591                output_msg_index_type = Some((
592                    next_culist_output_index,
593                    graph
594                        .0
595                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) // FIXME(gbin): Error handling and multimission
596                        .unwrap()
597                        .msg
598                        .clone(),
599                ));
600                next_culist_output_index += 1;
601            }
602        }
603
604        sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
605
606        if let Some(pos) = plan
607            .iter()
608            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
609        {
610            #[cfg(all(feature = "std", feature = "macro_debug"))]
611            eprintln!("    → Already in plan, modifying existing step");
612            let mut step = plan.remove(pos);
613            if let CuExecutionUnit::Step(ref mut s) = step {
614                s.input_msg_indices_types = input_msg_indices_types;
615            }
616            plan.push(step);
617        } else {
618            #[cfg(all(feature = "std", feature = "macro_debug"))]
619            eprintln!("    → New step added to plan");
620            let step = CuExecutionStep {
621                node_id: id,
622                node: node_ref.clone(),
623                task_type,
624                input_msg_indices_types,
625                output_msg_index_type,
626            };
627            plan.push(CuExecutionUnit::Step(step));
628        }
629
630        handled = true;
631    }
632
633    #[cfg(all(feature = "std", feature = "macro_debug"))]
634    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
635    (next_culist_output_index, handled)
636}
637
638/// This is the main heuristics to compute an execution plan at compilation time.
639/// TODO(gbin): Make that heuristic pluggable.
640pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
641    #[cfg(all(feature = "std", feature = "macro_debug"))]
642    eprintln!("[runtime plan]");
643    let visited = graph.0.visit_map();
644    let mut plan = Vec::new();
645    let mut next_culist_output_index = 0u32;
646
647    let mut queue: VecDeque<NodeId> = graph
648        .node_indices()
649        .iter()
650        .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
651        .map(|node| node.index() as NodeId)
652        .collect();
653
654    #[cfg(all(feature = "std", feature = "macro_debug"))]
655    eprintln!("Initial source nodes: {queue:?}");
656
657    while let Some(start_node) = queue.pop_front() {
658        if visited.is_visited(&start_node) {
659            #[cfg(all(feature = "std", feature = "macro_debug"))]
660            eprintln!("→ Skipping already visited source {start_node}");
661            continue;
662        }
663
664        #[cfg(all(feature = "std", feature = "macro_debug"))]
665        eprintln!("→ Starting BFS from source {start_node}");
666        let mut bfs = Bfs::new(&graph.0, start_node.into());
667
668        while let Some(node_index) = bfs.next(&graph.0) {
669            let node_id = node_index.index() as NodeId;
670            let already_in_plan = plan
671                .iter()
672                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
673            if already_in_plan {
674                #[cfg(all(feature = "std", feature = "macro_debug"))]
675                eprintln!("    → Node {node_id} already planned, skipping");
676                continue;
677            }
678
679            #[cfg(all(feature = "std", feature = "macro_debug"))]
680            eprintln!("    Planning from node {node_id}");
681            let (new_index, handled) =
682                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
683            next_culist_output_index = new_index;
684
685            if !handled {
686                #[cfg(all(feature = "std", feature = "macro_debug"))]
687                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
688                continue;
689            }
690
691            #[cfg(all(feature = "std", feature = "macro_debug"))]
692            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
693            for neighbor in graph.0.neighbors(node_index) {
694                if !visited.is_visited(&neighbor) {
695                    let nid = neighbor.index() as NodeId;
696                    #[cfg(all(feature = "std", feature = "macro_debug"))]
697                    eprintln!("      → Enqueueing neighbor {nid}");
698                    queue.push_back(nid);
699                }
700            }
701        }
702    }
703
704    Ok(CuExecutionLoop {
705        steps: plan,
706        loop_count: None,
707    })
708}
709
710//tests
711#[cfg(test)]
712mod tests {
713    use super::*;
714    use crate::config::Node;
715    use crate::cutask::CuSinkTask;
716    use crate::cutask::{CuSrcTask, Freezable};
717    use crate::monitoring::NoMonitor;
718    use bincode::Encode;
719    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
720    use serde_derive::Serialize;
721
722    pub struct TestSource {}
723
724    impl Freezable for TestSource {}
725
726    impl CuSrcTask for TestSource {
727        type Output<'m> = ();
728        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
729        where
730            Self: Sized,
731        {
732            Ok(Self {})
733        }
734
735        fn process(
736            &mut self,
737            _clock: &RobotClock,
738            _empty_msg: &mut Self::Output<'_>,
739        ) -> CuResult<()> {
740            Ok(())
741        }
742    }
743
744    pub struct TestSink {}
745
746    impl Freezable for TestSink {}
747
748    impl CuSinkTask for TestSink {
749        type Input<'m> = ();
750
751        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
752        where
753            Self: Sized,
754        {
755            Ok(Self {})
756        }
757
758        fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
759            Ok(())
760        }
761    }
762
763    // Those should be generated by the derive macro
764    type Tasks = (TestSource, TestSink);
765
766    #[derive(Debug, Encode, Decode, Serialize, Default)]
767    struct Msgs(());
768
769    impl ErasedCuStampedDataSet for Msgs {
770        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
771            Vec::new()
772        }
773    }
774
775    impl MatchingTasks for Msgs {
776        fn get_all_task_ids() -> &'static [&'static str] {
777            &[]
778        }
779    }
780
781    impl CuListZeroedInit for Msgs {
782        fn init_zeroed(&mut self) {}
783    }
784
785    #[cfg(feature = "std")]
786    fn tasks_instanciator(
787        all_instances_configs: Vec<Option<&ComponentConfig>>,
788        _threadpool: Arc<ThreadPool>,
789    ) -> CuResult<Tasks> {
790        Ok((
791            TestSource::new(all_instances_configs[0])?,
792            TestSink::new(all_instances_configs[1])?,
793        ))
794    }
795
796    #[cfg(not(feature = "std"))]
797    fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
798        Ok((
799            TestSource::new(all_instances_configs[0])?,
800            TestSink::new(all_instances_configs[1])?,
801        ))
802    }
803
804    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
805        NoMonitor {}
806    }
807
808    #[derive(Debug)]
809    struct FakeWriter {}
810
811    impl<E: Encode> WriteStream<E> for FakeWriter {
812        fn log(&mut self, _obj: &E) -> CuResult<()> {
813            Ok(())
814        }
815    }
816
817    #[test]
818    fn test_runtime_instantiation() {
819        let mut config = CuConfig::default();
820        let graph = config.get_graph_mut(None).unwrap();
821        graph.add_node(Node::new("a", "TestSource")).unwrap();
822        graph.add_node(Node::new("b", "TestSink")).unwrap();
823        graph.connect(0, 1, "()").unwrap();
824        let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
825            RobotClock::default(),
826            &config,
827            None,
828            tasks_instanciator,
829            monitor_instanciator,
830            FakeWriter {},
831            FakeWriter {},
832        );
833        assert!(runtime.is_ok());
834    }
835
836    #[test]
837    fn test_copperlists_manager_lifecycle() {
838        let mut config = CuConfig::default();
839        let graph = config.get_graph_mut(None).unwrap();
840        graph.add_node(Node::new("a", "TestSource")).unwrap();
841        graph.add_node(Node::new("b", "TestSink")).unwrap();
842        graph.connect(0, 1, "()").unwrap();
843
844        let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
845            RobotClock::default(),
846            &config,
847            None,
848            tasks_instanciator,
849            monitor_instanciator,
850            FakeWriter {},
851            FakeWriter {},
852        )
853        .unwrap();
854
855        // Now emulates the generated runtime
856        {
857            let copperlists = &mut runtime.copperlists_manager;
858            let culist0 = copperlists
859                .inner
860                .create()
861                .expect("Ran out of space for copper lists");
862            // FIXME: error handling.
863            let id = culist0.id;
864            assert_eq!(id, 0);
865            culist0.change_state(CopperListState::Processing);
866            assert_eq!(copperlists.available_copper_lists(), 1);
867        }
868
869        {
870            let copperlists = &mut runtime.copperlists_manager;
871            let culist1 = copperlists
872                .inner
873                .create()
874                .expect("Ran out of space for copper lists"); // FIXME: error handling.
875            let id = culist1.id;
876            assert_eq!(id, 1);
877            culist1.change_state(CopperListState::Processing);
878            assert_eq!(copperlists.available_copper_lists(), 0);
879        }
880
881        {
882            let copperlists = &mut runtime.copperlists_manager;
883            let culist2 = copperlists.inner.create();
884            assert!(culist2.is_none());
885            assert_eq!(copperlists.available_copper_lists(), 0);
886            // Free in order, should let the top of the stack be serialized and freed.
887            let _ = copperlists.end_of_processing(1);
888            assert_eq!(copperlists.available_copper_lists(), 1);
889        }
890
891        // Readd a CL
892        {
893            let copperlists = &mut runtime.copperlists_manager;
894            let culist2 = copperlists
895                .inner
896                .create()
897                .expect("Ran out of space for copper lists"); // FIXME: error handling.
898            let id = culist2.id;
899            assert_eq!(id, 2);
900            culist2.change_state(CopperListState::Processing);
901            assert_eq!(copperlists.available_copper_lists(), 0);
902            // Free out of order, the #0 first
903            let _ = copperlists.end_of_processing(0);
904            // Should not free up the top of the stack
905            assert_eq!(copperlists.available_copper_lists(), 0);
906
907            // Free up the top of the stack
908            let _ = copperlists.end_of_processing(2);
909            // This should free up 2 CLs
910
911            assert_eq!(copperlists.available_copper_lists(), 2);
912        }
913    }
914
915    #[test]
916    fn test_runtime_task_input_order() {
917        let mut config = CuConfig::default();
918        let graph = config.get_graph_mut(None).unwrap();
919        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
920        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
921        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
922
923        assert_eq!(src1_id, 0);
924        assert_eq!(src2_id, 1);
925
926        // note that the source2 connection is before the source1
927        let src1_type = "src1_type";
928        let src2_type = "src2_type";
929        graph.connect(src2_id, sink_id, src2_type).unwrap();
930        graph.connect(src1_id, sink_id, src1_type).unwrap();
931
932        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
933        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
934        // the edge id depends on the order the connection is created, not
935        // on the node id, and that is what determines the input order
936        assert_eq!(src1_edge_id, 1);
937        assert_eq!(src2_edge_id, 0);
938
939        let runtime = compute_runtime_plan(graph).unwrap();
940        let sink_step = runtime
941            .steps
942            .iter()
943            .find_map(|step| match step {
944                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
945                _ => None,
946            })
947            .unwrap();
948
949        // since the src2 connection was added before src1 connection, the src2 type should be
950        // first
951        assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
952        assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
953    }
954
955    #[test]
956    fn test_runtime_plan_diamond_case1() {
957        // more complex topology that tripped the scheduler
958        let mut config = CuConfig::default();
959        let graph = config.get_graph_mut(None).unwrap();
960        let cam0_id = graph
961            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
962            .unwrap();
963        let inf0_id = graph
964            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
965            .unwrap();
966        let broadcast_id = graph
967            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
968            .unwrap();
969
970        // case 1 order
971        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
972        graph.connect(cam0_id, inf0_id, "i32").unwrap();
973        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
974
975        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
976        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
977
978        assert_eq!(edge_cam0_to_inf0, 0);
979        assert_eq!(edge_cam0_to_broadcast, 1);
980
981        let runtime = compute_runtime_plan(graph).unwrap();
982        let broadcast_step = runtime
983            .steps
984            .iter()
985            .find_map(|step| match step {
986                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
987                _ => None,
988            })
989            .unwrap();
990
991        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
992        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
993    }
994
995    #[test]
996    fn test_runtime_plan_diamond_case2() {
997        // more complex topology that tripped the scheduler variation 2
998        let mut config = CuConfig::default();
999        let graph = config.get_graph_mut(None).unwrap();
1000        let cam0_id = graph
1001            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1002            .unwrap();
1003        let inf0_id = graph
1004            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1005            .unwrap();
1006        let broadcast_id = graph
1007            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1008            .unwrap();
1009
1010        // case 2 order
1011        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1012        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1013        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1014
1015        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1016        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1017
1018        assert_eq!(edge_cam0_to_broadcast, 0);
1019        assert_eq!(edge_cam0_to_inf0, 1);
1020
1021        let runtime = compute_runtime_plan(graph).unwrap();
1022        let broadcast_step = runtime
1023            .steps
1024            .iter()
1025            .find_map(|step| match step {
1026                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1027                _ => None,
1028            })
1029            .unwrap();
1030
1031        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1032        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1033    }
1034}