cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, CuDirection, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{build_monitor_topology, CuMonitor};
10use cu29_clock::{ClockProvider, CuTime, RobotClock};
11use cu29_traits::CuResult;
12use cu29_traits::WriteStream;
13use cu29_traits::{CopperListTuple, CuError};
14
15use bincode::enc::write::{SizeWriter, SliceWriter};
16use bincode::enc::EncoderImpl;
17use bincode::error::EncodeError;
18use bincode::{Decode, Encode};
19use core::fmt::{Debug, Formatter};
20use petgraph::prelude::*;
21use petgraph::visit::VisitMap;
22use petgraph::visit::Visitable;
23
24#[cfg(not(feature = "std"))]
25mod imp {
26    pub use alloc::boxed::Box;
27    pub use alloc::collections::VecDeque;
28    pub use alloc::format;
29    pub use alloc::string::String;
30    pub use alloc::string::ToString;
31    pub use alloc::vec::Vec;
32    pub use core::fmt::Result as FmtResult;
33}
34
35#[cfg(feature = "std")]
36mod imp {
37    pub use cu29_log_runtime::LoggerRuntime;
38    pub use cu29_unifiedlog::UnifiedLoggerWrite;
39    pub use rayon::ThreadPool;
40    pub use std::collections::VecDeque;
41    pub use std::fmt::Result as FmtResult;
42    pub use std::sync::{Arc, Mutex};
43}
44
45use imp::*;
46
47/// Just a simple struct to hold the various bits needed to run a Copper application.
48#[cfg(feature = "std")]
49pub struct CopperContext {
50    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
51    pub logger_runtime: LoggerRuntime,
52    pub clock: RobotClock,
53}
54
55/// Manages the lifecycle of the copper lists and logging.
56pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
57    pub inner: CuListsManager<P, NBCL>,
58    /// Logger for the copper lists (messages between tasks)
59    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
60}
61
62impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
63    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
64        let mut is_top = true;
65        let mut nb_done = 0;
66        for cl in self.inner.iter_mut() {
67            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
68                cl.change_state(CopperListState::DoneProcessing);
69            }
70            if is_top && cl.get_state() == CopperListState::DoneProcessing {
71                if let Some(logger) = &mut self.logger {
72                    cl.change_state(CopperListState::BeingSerialized);
73                    logger.log(cl)?;
74                }
75                cl.change_state(CopperListState::Free);
76                nb_done += 1;
77            } else {
78                is_top = false;
79            }
80        }
81        for _ in 0..nb_done {
82            let _ = self.inner.pop();
83        }
84        Ok(())
85    }
86
87    pub fn available_copper_lists(&self) -> usize {
88        NBCL - self.inner.len()
89    }
90}
91
92/// Manages the frozen tasks state and logging.
93pub struct KeyFramesManager {
94    /// Where the serialized tasks are stored following the wave of execution of a CL.
95    inner: KeyFrame,
96
97    /// Logger for the state of the tasks (frozen tasks)
98    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
99
100    /// Capture a keyframe only each...
101    keyframe_interval: u32,
102}
103
104impl KeyFramesManager {
105    fn is_keyframe(&self, culistid: u32) -> bool {
106        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
107    }
108
109    pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
110        if self.is_keyframe(culistid) {
111            self.inner.reset(culistid, clock.now());
112        }
113    }
114
115    pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
116        if self.is_keyframe(culistid) {
117            if self.inner.culistid != culistid {
118                panic!("Freezing task for a different culistid");
119            }
120            self.inner
121                .add_frozen_task(task)
122                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
123        } else {
124            Ok(0)
125        }
126    }
127
128    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
129        if self.is_keyframe(culistid) {
130            let logger = self.logger.as_mut().unwrap();
131            logger.log(&self.inner)
132        } else {
133            Ok(())
134        }
135    }
136}
137
138/// This is the main structure that will be injected as a member of the Application struct.
139/// CT is the tuple of all the tasks in order of execution.
140/// CL is the type of the copper list, representing the input/output messages for all the tasks.
141pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
142    /// The base clock the runtime will be using to record time.
143    pub clock: RobotClock, // TODO: remove public at some point
144
145    /// The tuple of all the tasks in order of execution.
146    pub tasks: CT,
147
148    /// Tuple of all instantiated bridges.
149    pub bridges: CB,
150
151    /// For backgrounded tasks.
152    #[cfg(feature = "std")]
153    pub threadpool: Arc<ThreadPool>,
154
155    /// The runtime monitoring.
156    pub monitor: M,
157
158    /// The logger for the copper lists (messages between tasks)
159    pub copperlists_manager: CopperListsManager<P, NBCL>,
160
161    /// The logger for the state of the tasks (frozen tasks)
162    pub keyframes_manager: KeyFramesManager,
163
164    /// The runtime configuration controlling the behavior of the run loop
165    pub runtime_config: RuntimeConfig,
166}
167
168/// To be able to share the clock we make the runtime a clock provider.
169impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
170    ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
171{
172    fn get_clock(&self) -> RobotClock {
173        self.clock.clone()
174    }
175}
176
177/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
178/// It is a double encapsulation: this one recording the culistid and another even in
179/// bincode in the serialized_tasks.
180#[derive(Encode, Decode)]
181pub struct KeyFrame {
182    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
183    pub culistid: u32,
184    // This is the timestamp when the keyframe was created, using the robot clock.
185    pub timestamp: CuTime,
186    // This is the bincode representation of the tuple of all the tasks.
187    pub serialized_tasks: Vec<u8>,
188}
189
190impl KeyFrame {
191    fn new() -> Self {
192        KeyFrame {
193            culistid: 0,
194            timestamp: CuTime::default(),
195            serialized_tasks: Vec::new(),
196        }
197    }
198
199    /// This is to be able to avoid reallocations
200    fn reset(&mut self, culistid: u32, timestamp: CuTime) {
201        self.culistid = culistid;
202        self.timestamp = timestamp;
203        self.serialized_tasks.clear();
204    }
205
206    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
207    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
208        let cfg = bincode::config::standard();
209        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
210        BincodeAdapter(task).encode(&mut sizer)?;
211        let need = sizer.into_writer().bytes_written as usize;
212
213        let start = self.serialized_tasks.len();
214        self.serialized_tasks.resize(start + need, 0);
215        let mut enc =
216            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
217        BincodeAdapter(task).encode(&mut enc)?;
218        Ok(need)
219    }
220}
221
222impl<
223        CT,
224        CB,
225        P: CopperListTuple + CuListZeroedInit + Default + 'static,
226        M: CuMonitor,
227        const NBCL: usize,
228    > CuRuntime<CT, CB, P, M, NBCL>
229{
230    // FIXME(gbin): this became REALLY ugly with no-std
231    #[allow(clippy::too_many_arguments)]
232    #[cfg(feature = "std")]
233    pub fn new(
234        clock: RobotClock,
235        config: &CuConfig,
236        mission: Option<&str>,
237        tasks_instanciator: impl for<'c> Fn(
238            Vec<Option<&'c ComponentConfig>>,
239            Arc<ThreadPool>,
240        ) -> CuResult<CT>,
241        monitor_instanciator: impl Fn(&CuConfig) -> M,
242        bridges_instanciator: impl Fn(&CuConfig) -> CuResult<CB>,
243        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
244        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
245    ) -> CuResult<Self> {
246        let graph = config.get_graph(mission)?;
247        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
248            .get_all_nodes()
249            .iter()
250            .map(|(_, node)| node.get_instance_config())
251            .collect();
252
253        // TODO: make that configurable
254
255        let threadpool = Arc::new(
256            rayon::ThreadPoolBuilder::new()
257                .num_threads(2) // default to 4 threads if not specified
258                .build()
259                .expect("Could not create the threadpool"),
260        );
261
262        let tasks = tasks_instanciator(all_instances_configs, threadpool.clone())?;
263        let mut monitor = monitor_instanciator(config);
264        if let Ok(topology) = build_monitor_topology(config, mission) {
265            monitor.set_topology(topology);
266        }
267        let bridges = bridges_instanciator(config)?;
268
269        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
270            Some(logging_config) if logging_config.enable_task_logging => (
271                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
272                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
273                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
274            ),
275            Some(_) => (None, None, 0), // explicit no enable logging
276            None => (
277                // default
278                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
279                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
280                DEFAULT_KEYFRAME_INTERVAL,
281            ),
282        };
283
284        let copperlists_manager = CopperListsManager {
285            inner: CuListsManager::new(),
286            logger: copperlists_logger,
287        };
288
289        let keyframes_manager = KeyFramesManager {
290            inner: KeyFrame::new(),
291            logger: keyframes_logger,
292            keyframe_interval,
293        };
294
295        let runtime_config = config.runtime.clone().unwrap_or_default();
296
297        let runtime = Self {
298            tasks,
299            bridges,
300            threadpool,
301            monitor,
302            clock,
303            copperlists_manager,
304            keyframes_manager,
305            runtime_config,
306        };
307
308        Ok(runtime)
309    }
310
311    #[allow(clippy::too_many_arguments)]
312    #[cfg(not(feature = "std"))]
313    pub fn new(
314        clock: RobotClock,
315        config: &CuConfig,
316        mission: Option<&str>,
317        tasks_instanciator: impl for<'c> Fn(Vec<Option<&'c ComponentConfig>>) -> CuResult<CT>,
318        monitor_instanciator: impl Fn(&CuConfig) -> M,
319        bridges_instanciator: impl Fn(&CuConfig) -> CuResult<CB>,
320        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
321        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
322    ) -> CuResult<Self> {
323        let graph = config.get_graph(mission)?;
324        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
325            .get_all_nodes()
326            .iter()
327            .map(|(_, node)| node.get_instance_config())
328            .collect();
329
330        let tasks = tasks_instanciator(all_instances_configs)?;
331
332        let mut monitor = monitor_instanciator(config);
333        if let Ok(topology) = build_monitor_topology(config, mission) {
334            monitor.set_topology(topology);
335        }
336        let bridges = bridges_instanciator(config)?;
337
338        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
339            Some(logging_config) if logging_config.enable_task_logging => (
340                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
341                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
342                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
343            ),
344            Some(_) => (None, None, 0), // explicit no enable logging
345            None => (
346                // default
347                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
348                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
349                DEFAULT_KEYFRAME_INTERVAL,
350            ),
351        };
352
353        let copperlists_manager = CopperListsManager {
354            inner: CuListsManager::new(),
355            logger: copperlists_logger,
356        };
357
358        let keyframes_manager = KeyFramesManager {
359            inner: KeyFrame::new(),
360            logger: keyframes_logger,
361            keyframe_interval,
362        };
363
364        let runtime_config = config.runtime.clone().unwrap_or_default();
365
366        let runtime = Self {
367            tasks,
368            bridges,
369            #[cfg(feature = "std")]
370            threadpool,
371            monitor,
372            clock,
373            copperlists_manager,
374            keyframes_manager,
375            runtime_config,
376        };
377
378        Ok(runtime)
379    }
380}
381
382/// Copper tasks can be of 3 types:
383/// - Source: only producing output messages (usually used for drivers)
384/// - Regular: processing input messages and producing output messages, more like compute nodes.
385/// - Sink: only consuming input messages (usually used for actuators)
386#[derive(Debug, PartialEq, Eq, Clone, Copy)]
387pub enum CuTaskType {
388    Source,
389    Regular,
390    Sink,
391}
392
393/// This structure represents a step in the execution plan.
394pub struct CuExecutionStep {
395    /// NodeId: node id of the task to execute
396    pub node_id: NodeId,
397    /// Node: node instance
398    pub node: Node,
399    /// CuTaskType: type of the task
400    pub task_type: CuTaskType,
401
402    /// the indices in the copper list of the input messages and their types
403    pub input_msg_indices_types: Vec<(u32, String)>,
404
405    /// the index in the copper list of the output message and its type
406    pub output_msg_index_type: Option<(u32, String)>,
407}
408
409impl Debug for CuExecutionStep {
410    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
411        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
412        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
413        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
414        f.write_str(
415            format!(
416                "              input_msg_types: {:?}\n",
417                self.input_msg_indices_types
418            )
419            .as_str(),
420        )?;
421        f.write_str(
422            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
423        )?;
424        Ok(())
425    }
426}
427
428/// This structure represents a loop in the execution plan.
429/// It is used to represent a sequence of Execution units (loop or steps) that are executed
430/// multiple times.
431/// if loop_count is None, the loop is infinite.
432pub struct CuExecutionLoop {
433    pub steps: Vec<CuExecutionUnit>,
434    pub loop_count: Option<u32>,
435}
436
437impl Debug for CuExecutionLoop {
438    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
439        f.write_str("CuExecutionLoop:\n")?;
440        for step in &self.steps {
441            match step {
442                CuExecutionUnit::Step(step) => {
443                    step.fmt(f)?;
444                }
445                CuExecutionUnit::Loop(l) => {
446                    l.fmt(f)?;
447                }
448            }
449        }
450
451        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
452        Ok(())
453    }
454}
455
456/// This structure represents a step in the execution plan.
457#[derive(Debug)]
458pub enum CuExecutionUnit {
459    Step(CuExecutionStep),
460    Loop(CuExecutionLoop),
461}
462
463fn find_output_index_type_from_nodeid(
464    node_id: NodeId,
465    steps: &Vec<CuExecutionUnit>,
466) -> Option<(u32, String)> {
467    for step in steps {
468        match step {
469            CuExecutionUnit::Loop(loop_unit) => {
470                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
471                    return Some(index);
472                }
473            }
474            CuExecutionUnit::Step(step) => {
475                if step.node_id == node_id {
476                    return step.output_msg_index_type.clone();
477                }
478            }
479        }
480    }
481    None
482}
483
484pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
485    if graph.incoming_neighbor_count(node_id) == 0 {
486        CuTaskType::Source
487    } else if graph.outgoing_neighbor_count(node_id) == 0 {
488        CuTaskType::Sink
489    } else {
490        CuTaskType::Regular
491    }
492}
493
494/// This function gets the input node by using the input step plan id, to get the edge that
495/// connects the input to the output in the config graph
496fn find_edge_with_plan_input_id(
497    plan: &[CuExecutionUnit],
498    graph: &CuGraph,
499    plan_id: u32,
500    output_node_id: NodeId,
501) -> usize {
502    let input_node = plan
503        .get(plan_id as usize)
504        .expect("Input step should've been added to plan before the step that receives the input");
505    let CuExecutionUnit::Step(input_step) = input_node else {
506        panic!("Expected input to be from a step, not a loop");
507    };
508    let input_node_id = input_step.node_id;
509
510    graph
511        .0
512        .edges_connecting(input_node_id.into(), output_node_id.into())
513        .map(|edge| edge.id().index())
514        .next()
515        .expect("An edge connecting the input to the output should exist")
516}
517
518/// The connection id used here is the index of the config graph edge that equates to the wanted
519/// connection
520fn sort_inputs_by_cnx_id(
521    input_msg_indices_types: &mut [(u32, String)],
522    plan: &[CuExecutionUnit],
523    graph: &CuGraph,
524    curr_node_id: NodeId,
525) {
526    input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
527        let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
528        let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
529        a_edge_id.cmp(&b_edge_id)
530    });
531}
532/// Explores a subbranch and build the partial plan out of it.
533fn plan_tasks_tree_branch(
534    graph: &CuGraph,
535    mut next_culist_output_index: u32,
536    starting_point: NodeId,
537    plan: &mut Vec<CuExecutionUnit>,
538) -> (u32, bool) {
539    #[cfg(all(feature = "std", feature = "macro_debug"))]
540    eprintln!("-- starting branch from node {starting_point}");
541
542    let mut visitor = Bfs::new(&graph.0, starting_point.into());
543    let mut handled = false;
544
545    while let Some(node) = visitor.next(&graph.0) {
546        let id = node.index() as NodeId;
547        let node_ref = graph.get_node(id).unwrap();
548        #[cfg(all(feature = "std", feature = "macro_debug"))]
549        eprintln!("  Visiting node: {node_ref:?}");
550
551        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
552        let output_msg_index_type: Option<(u32, String)>;
553        let task_type = find_task_type_for_id(graph, id);
554
555        match task_type {
556            CuTaskType::Source => {
557                #[cfg(all(feature = "std", feature = "macro_debug"))]
558                eprintln!("    → Source node, assign output index {next_culist_output_index}");
559                output_msg_index_type = Some((
560                    next_culist_output_index,
561                    graph
562                        .0
563                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
564                        .unwrap() // FIXME(gbin): Error handling
565                        .msg
566                        .clone(),
567                ));
568                next_culist_output_index += 1;
569            }
570            CuTaskType::Sink => {
571                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
572                #[cfg(all(feature = "std", feature = "macro_debug"))]
573                eprintln!("    → Sink with parents: {parents:?}");
574                for parent in parents {
575                    let pid = parent;
576                    let index_type = find_output_index_type_from_nodeid(pid, plan);
577                    if let Some(index_type) = index_type {
578                        #[cfg(all(feature = "std", feature = "macro_debug"))]
579                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
580                        input_msg_indices_types.push(index_type);
581                    } else {
582                        #[cfg(all(feature = "std", feature = "macro_debug"))]
583                        eprintln!("      ✗ Input from {pid} not ready, returning");
584                        return (next_culist_output_index, handled);
585                    }
586                }
587                output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
588                next_culist_output_index += 1;
589            }
590            CuTaskType::Regular => {
591                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
592                #[cfg(all(feature = "std", feature = "macro_debug"))]
593                eprintln!("    → Regular task with parents: {parents:?}");
594                for parent in parents {
595                    let pid = parent;
596                    let index_type = find_output_index_type_from_nodeid(pid, plan);
597                    if let Some(index_type) = index_type {
598                        #[cfg(all(feature = "std", feature = "macro_debug"))]
599                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
600                        input_msg_indices_types.push(index_type);
601                    } else {
602                        #[cfg(all(feature = "std", feature = "macro_debug"))]
603                        eprintln!("      ✗ Input from {pid} not ready, returning");
604                        return (next_culist_output_index, handled);
605                    }
606                }
607                output_msg_index_type = Some((
608                    next_culist_output_index,
609                    graph
610                        .0
611                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) // FIXME(gbin): Error handling and multimission
612                        .unwrap()
613                        .msg
614                        .clone(),
615                ));
616                next_culist_output_index += 1;
617            }
618        }
619
620        sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
621
622        if let Some(pos) = plan
623            .iter()
624            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
625        {
626            #[cfg(all(feature = "std", feature = "macro_debug"))]
627            eprintln!("    → Already in plan, modifying existing step");
628            let mut step = plan.remove(pos);
629            if let CuExecutionUnit::Step(ref mut s) = step {
630                s.input_msg_indices_types = input_msg_indices_types;
631            }
632            plan.push(step);
633        } else {
634            #[cfg(all(feature = "std", feature = "macro_debug"))]
635            eprintln!("    → New step added to plan");
636            let step = CuExecutionStep {
637                node_id: id,
638                node: node_ref.clone(),
639                task_type,
640                input_msg_indices_types,
641                output_msg_index_type,
642            };
643            plan.push(CuExecutionUnit::Step(step));
644        }
645
646        handled = true;
647    }
648
649    #[cfg(all(feature = "std", feature = "macro_debug"))]
650    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
651    (next_culist_output_index, handled)
652}
653
654/// This is the main heuristics to compute an execution plan at compilation time.
655/// TODO(gbin): Make that heuristic pluggable.
656pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
657    #[cfg(all(feature = "std", feature = "macro_debug"))]
658    eprintln!("[runtime plan]");
659    let visited = graph.0.visit_map();
660    let mut plan = Vec::new();
661    let mut next_culist_output_index = 0u32;
662
663    let mut queue: VecDeque<NodeId> = graph
664        .node_indices()
665        .iter()
666        .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
667        .map(|node| node.index() as NodeId)
668        .collect();
669
670    #[cfg(all(feature = "std", feature = "macro_debug"))]
671    eprintln!("Initial source nodes: {queue:?}");
672
673    while let Some(start_node) = queue.pop_front() {
674        if visited.is_visited(&start_node) {
675            #[cfg(all(feature = "std", feature = "macro_debug"))]
676            eprintln!("→ Skipping already visited source {start_node}");
677            continue;
678        }
679
680        #[cfg(all(feature = "std", feature = "macro_debug"))]
681        eprintln!("→ Starting BFS from source {start_node}");
682        let mut bfs = Bfs::new(&graph.0, start_node.into());
683
684        while let Some(node_index) = bfs.next(&graph.0) {
685            let node_id = node_index.index() as NodeId;
686            let already_in_plan = plan
687                .iter()
688                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
689            if already_in_plan {
690                #[cfg(all(feature = "std", feature = "macro_debug"))]
691                eprintln!("    → Node {node_id} already planned, skipping");
692                continue;
693            }
694
695            #[cfg(all(feature = "std", feature = "macro_debug"))]
696            eprintln!("    Planning from node {node_id}");
697            let (new_index, handled) =
698                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
699            next_culist_output_index = new_index;
700
701            if !handled {
702                #[cfg(all(feature = "std", feature = "macro_debug"))]
703                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
704                continue;
705            }
706
707            #[cfg(all(feature = "std", feature = "macro_debug"))]
708            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
709            for neighbor in graph.0.neighbors(node_index) {
710                if !visited.is_visited(&neighbor) {
711                    let nid = neighbor.index() as NodeId;
712                    #[cfg(all(feature = "std", feature = "macro_debug"))]
713                    eprintln!("      → Enqueueing neighbor {nid}");
714                    queue.push_back(nid);
715                }
716            }
717        }
718    }
719
720    Ok(CuExecutionLoop {
721        steps: plan,
722        loop_count: None,
723    })
724}
725
726//tests
727#[cfg(test)]
728mod tests {
729    use super::*;
730    use crate::config::Node;
731    use crate::cutask::CuSinkTask;
732    use crate::cutask::{CuSrcTask, Freezable};
733    use crate::monitoring::NoMonitor;
734    use bincode::Encode;
735    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
736    use serde_derive::Serialize;
737
738    pub struct TestSource {}
739
740    impl Freezable for TestSource {}
741
742    impl CuSrcTask for TestSource {
743        type Output<'m> = ();
744        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
745        where
746            Self: Sized,
747        {
748            Ok(Self {})
749        }
750
751        fn process(
752            &mut self,
753            _clock: &RobotClock,
754            _empty_msg: &mut Self::Output<'_>,
755        ) -> CuResult<()> {
756            Ok(())
757        }
758    }
759
760    pub struct TestSink {}
761
762    impl Freezable for TestSink {}
763
764    impl CuSinkTask for TestSink {
765        type Input<'m> = ();
766
767        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
768        where
769            Self: Sized,
770        {
771            Ok(Self {})
772        }
773
774        fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
775            Ok(())
776        }
777    }
778
779    // Those should be generated by the derive macro
780    type Tasks = (TestSource, TestSink);
781
782    #[derive(Debug, Encode, Decode, Serialize, Default)]
783    struct Msgs(());
784
785    impl ErasedCuStampedDataSet for Msgs {
786        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
787            Vec::new()
788        }
789    }
790
791    impl MatchingTasks for Msgs {
792        fn get_all_task_ids() -> &'static [&'static str] {
793            &[]
794        }
795    }
796
797    impl CuListZeroedInit for Msgs {
798        fn init_zeroed(&mut self) {}
799    }
800
801    #[cfg(feature = "std")]
802    fn tasks_instanciator(
803        all_instances_configs: Vec<Option<&ComponentConfig>>,
804        _threadpool: Arc<ThreadPool>,
805    ) -> CuResult<Tasks> {
806        Ok((
807            TestSource::new(all_instances_configs[0])?,
808            TestSink::new(all_instances_configs[1])?,
809        ))
810    }
811
812    #[cfg(not(feature = "std"))]
813    fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
814        Ok((
815            TestSource::new(all_instances_configs[0])?,
816            TestSink::new(all_instances_configs[1])?,
817        ))
818    }
819
820    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
821        NoMonitor {}
822    }
823
824    fn bridges_instanciator(_config: &CuConfig) -> CuResult<()> {
825        Ok(())
826    }
827
828    #[derive(Debug)]
829    struct FakeWriter {}
830
831    impl<E: Encode> WriteStream<E> for FakeWriter {
832        fn log(&mut self, _obj: &E) -> CuResult<()> {
833            Ok(())
834        }
835    }
836
837    #[test]
838    fn test_runtime_instantiation() {
839        let mut config = CuConfig::default();
840        let graph = config.get_graph_mut(None).unwrap();
841        graph.add_node(Node::new("a", "TestSource")).unwrap();
842        graph.add_node(Node::new("b", "TestSink")).unwrap();
843        graph.connect(0, 1, "()").unwrap();
844        let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
845            RobotClock::default(),
846            &config,
847            None,
848            tasks_instanciator,
849            monitor_instanciator,
850            bridges_instanciator,
851            FakeWriter {},
852            FakeWriter {},
853        );
854        assert!(runtime.is_ok());
855    }
856
857    #[test]
858    fn test_copperlists_manager_lifecycle() {
859        let mut config = CuConfig::default();
860        let graph = config.get_graph_mut(None).unwrap();
861        graph.add_node(Node::new("a", "TestSource")).unwrap();
862        graph.add_node(Node::new("b", "TestSink")).unwrap();
863        graph.connect(0, 1, "()").unwrap();
864
865        let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
866            RobotClock::default(),
867            &config,
868            None,
869            tasks_instanciator,
870            monitor_instanciator,
871            bridges_instanciator,
872            FakeWriter {},
873            FakeWriter {},
874        )
875        .unwrap();
876
877        // Now emulates the generated runtime
878        {
879            let copperlists = &mut runtime.copperlists_manager;
880            let culist0 = copperlists
881                .inner
882                .create()
883                .expect("Ran out of space for copper lists");
884            // FIXME: error handling.
885            let id = culist0.id;
886            assert_eq!(id, 0);
887            culist0.change_state(CopperListState::Processing);
888            assert_eq!(copperlists.available_copper_lists(), 1);
889        }
890
891        {
892            let copperlists = &mut runtime.copperlists_manager;
893            let culist1 = copperlists
894                .inner
895                .create()
896                .expect("Ran out of space for copper lists"); // FIXME: error handling.
897            let id = culist1.id;
898            assert_eq!(id, 1);
899            culist1.change_state(CopperListState::Processing);
900            assert_eq!(copperlists.available_copper_lists(), 0);
901        }
902
903        {
904            let copperlists = &mut runtime.copperlists_manager;
905            let culist2 = copperlists.inner.create();
906            assert!(culist2.is_none());
907            assert_eq!(copperlists.available_copper_lists(), 0);
908            // Free in order, should let the top of the stack be serialized and freed.
909            let _ = copperlists.end_of_processing(1);
910            assert_eq!(copperlists.available_copper_lists(), 1);
911        }
912
913        // Readd a CL
914        {
915            let copperlists = &mut runtime.copperlists_manager;
916            let culist2 = copperlists
917                .inner
918                .create()
919                .expect("Ran out of space for copper lists"); // FIXME: error handling.
920            let id = culist2.id;
921            assert_eq!(id, 2);
922            culist2.change_state(CopperListState::Processing);
923            assert_eq!(copperlists.available_copper_lists(), 0);
924            // Free out of order, the #0 first
925            let _ = copperlists.end_of_processing(0);
926            // Should not free up the top of the stack
927            assert_eq!(copperlists.available_copper_lists(), 0);
928
929            // Free up the top of the stack
930            let _ = copperlists.end_of_processing(2);
931            // This should free up 2 CLs
932
933            assert_eq!(copperlists.available_copper_lists(), 2);
934        }
935    }
936
937    #[test]
938    fn test_runtime_task_input_order() {
939        let mut config = CuConfig::default();
940        let graph = config.get_graph_mut(None).unwrap();
941        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
942        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
943        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
944
945        assert_eq!(src1_id, 0);
946        assert_eq!(src2_id, 1);
947
948        // note that the source2 connection is before the source1
949        let src1_type = "src1_type";
950        let src2_type = "src2_type";
951        graph.connect(src2_id, sink_id, src2_type).unwrap();
952        graph.connect(src1_id, sink_id, src1_type).unwrap();
953
954        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
955        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
956        // the edge id depends on the order the connection is created, not
957        // on the node id, and that is what determines the input order
958        assert_eq!(src1_edge_id, 1);
959        assert_eq!(src2_edge_id, 0);
960
961        let runtime = compute_runtime_plan(graph).unwrap();
962        let sink_step = runtime
963            .steps
964            .iter()
965            .find_map(|step| match step {
966                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
967                _ => None,
968            })
969            .unwrap();
970
971        // since the src2 connection was added before src1 connection, the src2 type should be
972        // first
973        assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
974        assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
975    }
976
977    #[test]
978    fn test_runtime_plan_diamond_case1() {
979        // more complex topology that tripped the scheduler
980        let mut config = CuConfig::default();
981        let graph = config.get_graph_mut(None).unwrap();
982        let cam0_id = graph
983            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
984            .unwrap();
985        let inf0_id = graph
986            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
987            .unwrap();
988        let broadcast_id = graph
989            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
990            .unwrap();
991
992        // case 1 order
993        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
994        graph.connect(cam0_id, inf0_id, "i32").unwrap();
995        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
996
997        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
998        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
999
1000        assert_eq!(edge_cam0_to_inf0, 0);
1001        assert_eq!(edge_cam0_to_broadcast, 1);
1002
1003        let runtime = compute_runtime_plan(graph).unwrap();
1004        let broadcast_step = runtime
1005            .steps
1006            .iter()
1007            .find_map(|step| match step {
1008                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1009                _ => None,
1010            })
1011            .unwrap();
1012
1013        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1014        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1015    }
1016
1017    #[test]
1018    fn test_runtime_plan_diamond_case2() {
1019        // more complex topology that tripped the scheduler variation 2
1020        let mut config = CuConfig::default();
1021        let graph = config.get_graph_mut(None).unwrap();
1022        let cam0_id = graph
1023            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1024            .unwrap();
1025        let inf0_id = graph
1026            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1027            .unwrap();
1028        let broadcast_id = graph
1029            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1030            .unwrap();
1031
1032        // case 2 order
1033        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1034        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1035        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1036
1037        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1038        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1039
1040        assert_eq!(edge_cam0_to_broadcast, 0);
1041        assert_eq!(edge_cam0_to_inf0, 1);
1042
1043        let runtime = compute_runtime_plan(graph).unwrap();
1044        let broadcast_step = runtime
1045            .steps
1046            .iter()
1047            .find_map(|step| match step {
1048                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1049                _ => None,
1050            })
1051            .unwrap();
1052
1053        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1054        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1055    }
1056}