cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{CuMonitor, build_monitor_topology};
10use crate::resource::ResourceManager;
11use cu29_clock::{ClockProvider, CuTime, RobotClock};
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15
16#[cfg(target_os = "none")]
17#[allow(unused_imports)]
18use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
19#[cfg(target_os = "none")]
20#[allow(unused_imports)]
21use cu29_log_derive::info;
22#[cfg(target_os = "none")]
23#[allow(unused_imports)]
24use cu29_log_runtime::log;
25#[cfg(all(target_os = "none", debug_assertions))]
26#[allow(unused_imports)]
27use cu29_log_runtime::log_debug_mode;
28#[cfg(target_os = "none")]
29#[allow(unused_imports)]
30use cu29_value::to_value;
31
32use alloc::boxed::Box;
33use alloc::collections::VecDeque;
34use alloc::format;
35use alloc::string::{String, ToString};
36use alloc::vec::Vec;
37use bincode::enc::EncoderImpl;
38use bincode::enc::write::{SizeWriter, SliceWriter};
39use bincode::error::EncodeError;
40use bincode::{Decode, Encode};
41use core::fmt::Result as FmtResult;
42use core::fmt::{Debug, Formatter};
43use petgraph::prelude::*;
44use petgraph::visit::VisitMap;
45use petgraph::visit::Visitable;
46
47#[cfg(feature = "std")]
48use cu29_log_runtime::LoggerRuntime;
49#[cfg(feature = "std")]
50use cu29_unifiedlog::UnifiedLoggerWrite;
51#[cfg(feature = "std")]
52use std::sync::{Arc, Mutex};
53
54/// Just a simple struct to hold the various bits needed to run a Copper application.
55#[cfg(feature = "std")]
56pub struct CopperContext {
57    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
58    pub logger_runtime: LoggerRuntime,
59    pub clock: RobotClock,
60}
61
62/// Manages the lifecycle of the copper lists and logging.
63pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
64    pub inner: CuListsManager<P, NBCL>,
65    /// Logger for the copper lists (messages between tasks)
66    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
67}
68
69impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
70    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
71        let mut is_top = true;
72        let mut nb_done = 0;
73        for cl in self.inner.iter_mut() {
74            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
75                cl.change_state(CopperListState::DoneProcessing);
76            }
77            if is_top && cl.get_state() == CopperListState::DoneProcessing {
78                if let Some(logger) = &mut self.logger {
79                    cl.change_state(CopperListState::BeingSerialized);
80                    logger.log(cl)?;
81                }
82                cl.change_state(CopperListState::Free);
83                nb_done += 1;
84            } else {
85                is_top = false;
86            }
87        }
88        for _ in 0..nb_done {
89            let _ = self.inner.pop();
90        }
91        Ok(())
92    }
93
94    pub fn available_copper_lists(&self) -> usize {
95        NBCL - self.inner.len()
96    }
97}
98
99/// Manages the frozen tasks state and logging.
100pub struct KeyFramesManager {
101    /// Where the serialized tasks are stored following the wave of execution of a CL.
102    inner: KeyFrame,
103
104    /// Logger for the state of the tasks (frozen tasks)
105    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
106
107    /// Capture a keyframe only each...
108    keyframe_interval: u32,
109}
110
111impl KeyFramesManager {
112    fn is_keyframe(&self, culistid: u32) -> bool {
113        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
114    }
115
116    pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
117        if self.is_keyframe(culistid) {
118            self.inner.reset(culistid, clock.now());
119        }
120    }
121
122    pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
123        if self.is_keyframe(culistid) {
124            if self.inner.culistid != culistid {
125                panic!("Freezing task for a different culistid");
126            }
127            self.inner
128                .add_frozen_task(task)
129                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
130        } else {
131            Ok(0)
132        }
133    }
134
135    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
136        if self.is_keyframe(culistid) {
137            let logger = self.logger.as_mut().unwrap();
138            logger.log(&self.inner)
139        } else {
140            Ok(())
141        }
142    }
143}
144
145/// This is the main structure that will be injected as a member of the Application struct.
146/// CT is the tuple of all the tasks in order of execution.
147/// CL is the type of the copper list, representing the input/output messages for all the tasks.
148pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
149    /// The base clock the runtime will be using to record time.
150    pub clock: RobotClock, // TODO: remove public at some point
151
152    /// The tuple of all the tasks in order of execution.
153    pub tasks: CT,
154
155    /// Tuple of all instantiated bridges.
156    pub bridges: CB,
157
158    /// Resource registry kept alive for tasks borrowing shared handles.
159    pub resources: ResourceManager,
160
161    /// The runtime monitoring.
162    pub monitor: M,
163
164    /// The logger for the copper lists (messages between tasks)
165    pub copperlists_manager: CopperListsManager<P, NBCL>,
166
167    /// The logger for the state of the tasks (frozen tasks)
168    pub keyframes_manager: KeyFramesManager,
169
170    /// The runtime configuration controlling the behavior of the run loop
171    pub runtime_config: RuntimeConfig,
172}
173
174/// To be able to share the clock we make the runtime a clock provider.
175impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
176    ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
177{
178    fn get_clock(&self) -> RobotClock {
179        self.clock.clone()
180    }
181}
182
183/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
184/// It is a double encapsulation: this one recording the culistid and another even in
185/// bincode in the serialized_tasks.
186#[derive(Encode, Decode)]
187pub struct KeyFrame {
188    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
189    pub culistid: u32,
190    // This is the timestamp when the keyframe was created, using the robot clock.
191    pub timestamp: CuTime,
192    // This is the bincode representation of the tuple of all the tasks.
193    pub serialized_tasks: Vec<u8>,
194}
195
196impl KeyFrame {
197    fn new() -> Self {
198        KeyFrame {
199            culistid: 0,
200            timestamp: CuTime::default(),
201            serialized_tasks: Vec::new(),
202        }
203    }
204
205    /// This is to be able to avoid reallocations
206    fn reset(&mut self, culistid: u32, timestamp: CuTime) {
207        self.culistid = culistid;
208        self.timestamp = timestamp;
209        self.serialized_tasks.clear();
210    }
211
212    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
213    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
214        let cfg = bincode::config::standard();
215        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
216        BincodeAdapter(task).encode(&mut sizer)?;
217        let need = sizer.into_writer().bytes_written as usize;
218
219        let start = self.serialized_tasks.len();
220        self.serialized_tasks.resize(start + need, 0);
221        let mut enc =
222            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
223        BincodeAdapter(task).encode(&mut enc)?;
224        Ok(need)
225    }
226}
227
228impl<
229    CT,
230    CB,
231    P: CopperListTuple + CuListZeroedInit + Default + 'static,
232    M: CuMonitor,
233    const NBCL: usize,
234> CuRuntime<CT, CB, P, M, NBCL>
235{
236    // FIXME(gbin): this became REALLY ugly with no-std
237    #[allow(clippy::too_many_arguments)]
238    #[cfg(feature = "std")]
239    pub fn new(
240        clock: RobotClock,
241        config: &CuConfig,
242        mission: Option<&str>,
243        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
244        tasks_instanciator: impl for<'c> Fn(
245            Vec<Option<&'c ComponentConfig>>,
246            &mut ResourceManager,
247        ) -> CuResult<CT>,
248        monitor_instanciator: impl Fn(&CuConfig) -> M,
249        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
250        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
251        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
252    ) -> CuResult<Self> {
253        let graph = config.get_graph(mission)?;
254        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
255            .get_all_nodes()
256            .iter()
257            .map(|(_, node)| node.get_instance_config())
258            .collect();
259
260        let mut resources = resources_instanciator(config)?;
261
262        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
263        let mut monitor = monitor_instanciator(config);
264        if let Ok(topology) = build_monitor_topology(config, mission) {
265            monitor.set_topology(topology);
266        }
267        let bridges = bridges_instanciator(config, &mut resources)?;
268
269        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
270            Some(logging_config) if logging_config.enable_task_logging => (
271                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
272                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
273                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
274            ),
275            Some(_) => (None, None, 0), // explicit no enable logging
276            None => (
277                // default
278                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
279                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
280                DEFAULT_KEYFRAME_INTERVAL,
281            ),
282        };
283
284        let copperlists_manager = CopperListsManager {
285            inner: CuListsManager::new(),
286            logger: copperlists_logger,
287        };
288        #[cfg(target_os = "none")]
289        {
290            let cl_size = core::mem::size_of::<CopperList<P>>();
291            let total_bytes = cl_size.saturating_mul(NBCL);
292            info!(
293                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
294                NBCL, cl_size, total_bytes
295            );
296        }
297
298        let keyframes_manager = KeyFramesManager {
299            inner: KeyFrame::new(),
300            logger: keyframes_logger,
301            keyframe_interval,
302        };
303
304        let runtime_config = config.runtime.clone().unwrap_or_default();
305
306        let runtime = Self {
307            tasks,
308            bridges,
309            resources,
310            monitor,
311            clock,
312            copperlists_manager,
313            keyframes_manager,
314            runtime_config,
315        };
316
317        Ok(runtime)
318    }
319
320    #[allow(clippy::too_many_arguments)]
321    #[cfg(not(feature = "std"))]
322    pub fn new(
323        clock: RobotClock,
324        config: &CuConfig,
325        mission: Option<&str>,
326        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
327        tasks_instanciator: impl for<'c> Fn(
328            Vec<Option<&'c ComponentConfig>>,
329            &mut ResourceManager,
330        ) -> CuResult<CT>,
331        monitor_instanciator: impl Fn(&CuConfig) -> M,
332        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
333        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
334        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
335    ) -> CuResult<Self> {
336        #[cfg(target_os = "none")]
337        info!("CuRuntime::new: get graph");
338        let graph = config.get_graph(mission)?;
339        #[cfg(target_os = "none")]
340        info!("CuRuntime::new: graph ok");
341        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
342            .get_all_nodes()
343            .iter()
344            .map(|(_, node)| node.get_instance_config())
345            .collect();
346
347        #[cfg(target_os = "none")]
348        info!("CuRuntime::new: resources instanciator");
349        let mut resources = resources_instanciator(config)?;
350
351        #[cfg(target_os = "none")]
352        info!("CuRuntime::new: tasks instanciator");
353        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
354
355        #[cfg(target_os = "none")]
356        info!("CuRuntime::new: monitor instanciator");
357        let mut monitor = monitor_instanciator(config);
358        #[cfg(target_os = "none")]
359        info!("CuRuntime::new: monitor instanciator ok");
360        #[cfg(target_os = "none")]
361        info!("CuRuntime::new: build monitor topology");
362        if let Ok(topology) = build_monitor_topology(config, mission) {
363            #[cfg(target_os = "none")]
364            info!("CuRuntime::new: monitor topology ok");
365            monitor.set_topology(topology);
366            #[cfg(target_os = "none")]
367            info!("CuRuntime::new: monitor topology set");
368        }
369        #[cfg(target_os = "none")]
370        info!("CuRuntime::new: bridges instanciator");
371        let bridges = bridges_instanciator(config, &mut resources)?;
372
373        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
374            Some(logging_config) if logging_config.enable_task_logging => (
375                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
376                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
377                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
378            ),
379            Some(_) => (None, None, 0), // explicit no enable logging
380            None => (
381                // default
382                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
383                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
384                DEFAULT_KEYFRAME_INTERVAL,
385            ),
386        };
387
388        let copperlists_manager = CopperListsManager {
389            inner: CuListsManager::new(),
390            logger: copperlists_logger,
391        };
392        #[cfg(target_os = "none")]
393        {
394            let cl_size = core::mem::size_of::<CopperList<P>>();
395            let total_bytes = cl_size.saturating_mul(NBCL);
396            info!(
397                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
398                NBCL, cl_size, total_bytes
399            );
400        }
401
402        let keyframes_manager = KeyFramesManager {
403            inner: KeyFrame::new(),
404            logger: keyframes_logger,
405            keyframe_interval,
406        };
407
408        let runtime_config = config.runtime.clone().unwrap_or_default();
409
410        let runtime = Self {
411            tasks,
412            bridges,
413            resources,
414            monitor,
415            clock,
416            copperlists_manager,
417            keyframes_manager,
418            runtime_config,
419        };
420
421        Ok(runtime)
422    }
423}
424
425/// Copper tasks can be of 3 types:
426/// - Source: only producing output messages (usually used for drivers)
427/// - Regular: processing input messages and producing output messages, more like compute nodes.
428/// - Sink: only consuming input messages (usually used for actuators)
429#[derive(Debug, PartialEq, Eq, Clone, Copy)]
430pub enum CuTaskType {
431    Source,
432    Regular,
433    Sink,
434}
435
436/// This structure represents a step in the execution plan.
437pub struct CuExecutionStep {
438    /// NodeId: node id of the task to execute
439    pub node_id: NodeId,
440    /// Node: node instance
441    pub node: Node,
442    /// CuTaskType: type of the task
443    pub task_type: CuTaskType,
444
445    /// the indices in the copper list of the input messages and their types
446    pub input_msg_indices_types: Vec<(u32, String)>,
447
448    /// the index in the copper list of the output message and its type
449    pub output_msg_index_type: Option<(u32, String)>,
450}
451
452impl Debug for CuExecutionStep {
453    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
454        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
455        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
456        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
457        f.write_str(
458            format!(
459                "              input_msg_types: {:?}\n",
460                self.input_msg_indices_types
461            )
462            .as_str(),
463        )?;
464        f.write_str(
465            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
466        )?;
467        Ok(())
468    }
469}
470
471/// This structure represents a loop in the execution plan.
472/// It is used to represent a sequence of Execution units (loop or steps) that are executed
473/// multiple times.
474/// if loop_count is None, the loop is infinite.
475pub struct CuExecutionLoop {
476    pub steps: Vec<CuExecutionUnit>,
477    pub loop_count: Option<u32>,
478}
479
480impl Debug for CuExecutionLoop {
481    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
482        f.write_str("CuExecutionLoop:\n")?;
483        for step in &self.steps {
484            match step {
485                CuExecutionUnit::Step(step) => {
486                    step.fmt(f)?;
487                }
488                CuExecutionUnit::Loop(l) => {
489                    l.fmt(f)?;
490                }
491            }
492        }
493
494        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
495        Ok(())
496    }
497}
498
499/// This structure represents a step in the execution plan.
500#[derive(Debug)]
501pub enum CuExecutionUnit {
502    Step(CuExecutionStep),
503    Loop(CuExecutionLoop),
504}
505
506fn find_output_index_type_from_nodeid(
507    node_id: NodeId,
508    steps: &Vec<CuExecutionUnit>,
509) -> Option<(u32, String)> {
510    for step in steps {
511        match step {
512            CuExecutionUnit::Loop(loop_unit) => {
513                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
514                    return Some(index);
515                }
516            }
517            CuExecutionUnit::Step(step) => {
518                if step.node_id == node_id {
519                    return step.output_msg_index_type.clone();
520                }
521            }
522        }
523    }
524    None
525}
526
527pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
528    if graph.incoming_neighbor_count(node_id) == 0 {
529        CuTaskType::Source
530    } else if graph.outgoing_neighbor_count(node_id) == 0 {
531        CuTaskType::Sink
532    } else {
533        CuTaskType::Regular
534    }
535}
536
537/// This function gets the input node by using the input step plan id, to get the edge that
538/// connects the input to the output in the config graph
539fn find_edge_with_plan_input_id(
540    plan: &[CuExecutionUnit],
541    graph: &CuGraph,
542    plan_id: u32,
543    output_node_id: NodeId,
544) -> usize {
545    let input_node = plan
546        .get(plan_id as usize)
547        .expect("Input step should've been added to plan before the step that receives the input");
548    let CuExecutionUnit::Step(input_step) = input_node else {
549        panic!("Expected input to be from a step, not a loop");
550    };
551    let input_node_id = input_step.node_id;
552
553    graph
554        .0
555        .edges_connecting(input_node_id.into(), output_node_id.into())
556        .map(|edge| edge.id().index())
557        .next()
558        .expect("An edge connecting the input to the output should exist")
559}
560
561/// The connection id used here is the index of the config graph edge that equates to the wanted
562/// connection
563fn sort_inputs_by_cnx_id(
564    input_msg_indices_types: &mut [(u32, String)],
565    plan: &[CuExecutionUnit],
566    graph: &CuGraph,
567    curr_node_id: NodeId,
568) {
569    input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
570        let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
571        let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
572        a_edge_id.cmp(&b_edge_id)
573    });
574}
575/// Explores a subbranch and build the partial plan out of it.
576fn plan_tasks_tree_branch(
577    graph: &CuGraph,
578    mut next_culist_output_index: u32,
579    starting_point: NodeId,
580    plan: &mut Vec<CuExecutionUnit>,
581) -> (u32, bool) {
582    #[cfg(all(feature = "std", feature = "macro_debug"))]
583    eprintln!("-- starting branch from node {starting_point}");
584
585    let mut visitor = Bfs::new(&graph.0, starting_point.into());
586    let mut handled = false;
587
588    while let Some(node) = visitor.next(&graph.0) {
589        let id = node.index() as NodeId;
590        let node_ref = graph.get_node(id).unwrap();
591        #[cfg(all(feature = "std", feature = "macro_debug"))]
592        eprintln!("  Visiting node: {node_ref:?}");
593
594        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
595        let output_msg_index_type: Option<(u32, String)>;
596        let task_type = find_task_type_for_id(graph, id);
597
598        match task_type {
599            CuTaskType::Source => {
600                #[cfg(all(feature = "std", feature = "macro_debug"))]
601                eprintln!("    → Source node, assign output index {next_culist_output_index}");
602                output_msg_index_type = Some((
603                    next_culist_output_index,
604                    graph
605                        .0
606                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
607                        .unwrap() // FIXME(gbin): Error handling
608                        .msg
609                        .clone(),
610                ));
611                next_culist_output_index += 1;
612            }
613            CuTaskType::Sink => {
614                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
615                #[cfg(all(feature = "std", feature = "macro_debug"))]
616                eprintln!("    → Sink with parents: {parents:?}");
617                for parent in parents {
618                    let pid = parent;
619                    let index_type = find_output_index_type_from_nodeid(pid, plan);
620                    if let Some(index_type) = index_type {
621                        #[cfg(all(feature = "std", feature = "macro_debug"))]
622                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
623                        input_msg_indices_types.push(index_type);
624                    } else {
625                        #[cfg(all(feature = "std", feature = "macro_debug"))]
626                        eprintln!("      ✗ Input from {pid} not ready, returning");
627                        return (next_culist_output_index, handled);
628                    }
629                }
630                output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
631                next_culist_output_index += 1;
632            }
633            CuTaskType::Regular => {
634                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
635                #[cfg(all(feature = "std", feature = "macro_debug"))]
636                eprintln!("    → Regular task with parents: {parents:?}");
637                for parent in parents {
638                    let pid = parent;
639                    let index_type = find_output_index_type_from_nodeid(pid, plan);
640                    if let Some(index_type) = index_type {
641                        #[cfg(all(feature = "std", feature = "macro_debug"))]
642                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
643                        input_msg_indices_types.push(index_type);
644                    } else {
645                        #[cfg(all(feature = "std", feature = "macro_debug"))]
646                        eprintln!("      ✗ Input from {pid} not ready, returning");
647                        return (next_culist_output_index, handled);
648                    }
649                }
650                output_msg_index_type = Some((
651                    next_culist_output_index,
652                    graph
653                        .0
654                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) // FIXME(gbin): Error handling and multimission
655                        .unwrap()
656                        .msg
657                        .clone(),
658                ));
659                next_culist_output_index += 1;
660            }
661        }
662
663        sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
664
665        if let Some(pos) = plan
666            .iter()
667            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
668        {
669            #[cfg(all(feature = "std", feature = "macro_debug"))]
670            eprintln!("    → Already in plan, modifying existing step");
671            let mut step = plan.remove(pos);
672            if let CuExecutionUnit::Step(ref mut s) = step {
673                s.input_msg_indices_types = input_msg_indices_types;
674            }
675            plan.push(step);
676        } else {
677            #[cfg(all(feature = "std", feature = "macro_debug"))]
678            eprintln!("    → New step added to plan");
679            let step = CuExecutionStep {
680                node_id: id,
681                node: node_ref.clone(),
682                task_type,
683                input_msg_indices_types,
684                output_msg_index_type,
685            };
686            plan.push(CuExecutionUnit::Step(step));
687        }
688
689        handled = true;
690    }
691
692    #[cfg(all(feature = "std", feature = "macro_debug"))]
693    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
694    (next_culist_output_index, handled)
695}
696
697/// This is the main heuristics to compute an execution plan at compilation time.
698/// TODO(gbin): Make that heuristic pluggable.
699pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
700    #[cfg(all(feature = "std", feature = "macro_debug"))]
701    eprintln!("[runtime plan]");
702    let visited = graph.0.visit_map();
703    let mut plan = Vec::new();
704    let mut next_culist_output_index = 0u32;
705
706    let mut queue: VecDeque<NodeId> = graph
707        .node_indices()
708        .iter()
709        .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
710        .map(|node| node.index() as NodeId)
711        .collect();
712
713    #[cfg(all(feature = "std", feature = "macro_debug"))]
714    eprintln!("Initial source nodes: {queue:?}");
715
716    while let Some(start_node) = queue.pop_front() {
717        if visited.is_visited(&start_node) {
718            #[cfg(all(feature = "std", feature = "macro_debug"))]
719            eprintln!("→ Skipping already visited source {start_node}");
720            continue;
721        }
722
723        #[cfg(all(feature = "std", feature = "macro_debug"))]
724        eprintln!("→ Starting BFS from source {start_node}");
725        let mut bfs = Bfs::new(&graph.0, start_node.into());
726
727        while let Some(node_index) = bfs.next(&graph.0) {
728            let node_id = node_index.index() as NodeId;
729            let already_in_plan = plan
730                .iter()
731                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
732            if already_in_plan {
733                #[cfg(all(feature = "std", feature = "macro_debug"))]
734                eprintln!("    → Node {node_id} already planned, skipping");
735                continue;
736            }
737
738            #[cfg(all(feature = "std", feature = "macro_debug"))]
739            eprintln!("    Planning from node {node_id}");
740            let (new_index, handled) =
741                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
742            next_culist_output_index = new_index;
743
744            if !handled {
745                #[cfg(all(feature = "std", feature = "macro_debug"))]
746                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
747                continue;
748            }
749
750            #[cfg(all(feature = "std", feature = "macro_debug"))]
751            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
752            for neighbor in graph.0.neighbors(node_index) {
753                if !visited.is_visited(&neighbor) {
754                    let nid = neighbor.index() as NodeId;
755                    #[cfg(all(feature = "std", feature = "macro_debug"))]
756                    eprintln!("      → Enqueueing neighbor {nid}");
757                    queue.push_back(nid);
758                }
759            }
760        }
761    }
762
763    Ok(CuExecutionLoop {
764        steps: plan,
765        loop_count: None,
766    })
767}
768
769//tests
770#[cfg(test)]
771mod tests {
772    use super::*;
773    use crate::config::Node;
774    use crate::cutask::CuSinkTask;
775    use crate::cutask::{CuSrcTask, Freezable};
776    use crate::monitoring::NoMonitor;
777    use bincode::Encode;
778    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
779    use serde_derive::Serialize;
780
781    pub struct TestSource {}
782
783    impl Freezable for TestSource {}
784
785    impl CuSrcTask for TestSource {
786        type Resources<'r> = ();
787        type Output<'m> = ();
788        fn new_with(
789            _config: Option<&ComponentConfig>,
790            _resources: Self::Resources<'_>,
791        ) -> CuResult<Self>
792        where
793            Self: Sized,
794        {
795            Ok(Self {})
796        }
797
798        fn process(
799            &mut self,
800            _clock: &RobotClock,
801            _empty_msg: &mut Self::Output<'_>,
802        ) -> CuResult<()> {
803            Ok(())
804        }
805    }
806
807    pub struct TestSink {}
808
809    impl Freezable for TestSink {}
810
811    impl CuSinkTask for TestSink {
812        type Resources<'r> = ();
813        type Input<'m> = ();
814
815        fn new_with(
816            _config: Option<&ComponentConfig>,
817            _resources: Self::Resources<'_>,
818        ) -> CuResult<Self>
819        where
820            Self: Sized,
821        {
822            Ok(Self {})
823        }
824
825        fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
826            Ok(())
827        }
828    }
829
830    // Those should be generated by the derive macro
831    type Tasks = (TestSource, TestSink);
832
833    #[derive(Debug, Encode, Decode, Serialize, Default)]
834    struct Msgs(());
835
836    impl ErasedCuStampedDataSet for Msgs {
837        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
838            Vec::new()
839        }
840    }
841
842    impl MatchingTasks for Msgs {
843        fn get_all_task_ids() -> &'static [&'static str] {
844            &[]
845        }
846    }
847
848    impl CuListZeroedInit for Msgs {
849        fn init_zeroed(&mut self) {}
850    }
851
852    #[cfg(feature = "std")]
853    fn tasks_instanciator(
854        all_instances_configs: Vec<Option<&ComponentConfig>>,
855        _resources: &mut ResourceManager,
856    ) -> CuResult<Tasks> {
857        Ok((
858            TestSource::new(all_instances_configs[0])?,
859            TestSink::new(all_instances_configs[1])?,
860        ))
861    }
862
863    #[cfg(not(feature = "std"))]
864    fn tasks_instanciator(
865        all_instances_configs: Vec<Option<&ComponentConfig>>,
866        _resources: &mut ResourceManager,
867    ) -> CuResult<Tasks> {
868        Ok((
869            TestSource::new(all_instances_configs[0])?,
870            TestSink::new(all_instances_configs[1])?,
871        ))
872    }
873
874    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
875        NoMonitor {}
876    }
877
878    fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
879        Ok(())
880    }
881
882    fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
883        Ok(ResourceManager::new(0))
884    }
885
886    #[derive(Debug)]
887    struct FakeWriter {}
888
889    impl<E: Encode> WriteStream<E> for FakeWriter {
890        fn log(&mut self, _obj: &E) -> CuResult<()> {
891            Ok(())
892        }
893    }
894
895    #[test]
896    fn test_runtime_instantiation() {
897        let mut config = CuConfig::default();
898        let graph = config.get_graph_mut(None).unwrap();
899        graph.add_node(Node::new("a", "TestSource")).unwrap();
900        graph.add_node(Node::new("b", "TestSink")).unwrap();
901        graph.connect(0, 1, "()").unwrap();
902        let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
903            RobotClock::default(),
904            &config,
905            None,
906            resources_instanciator,
907            tasks_instanciator,
908            monitor_instanciator,
909            bridges_instanciator,
910            FakeWriter {},
911            FakeWriter {},
912        );
913        assert!(runtime.is_ok());
914    }
915
916    #[test]
917    fn test_copperlists_manager_lifecycle() {
918        let mut config = CuConfig::default();
919        let graph = config.get_graph_mut(None).unwrap();
920        graph.add_node(Node::new("a", "TestSource")).unwrap();
921        graph.add_node(Node::new("b", "TestSink")).unwrap();
922        graph.connect(0, 1, "()").unwrap();
923
924        let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
925            RobotClock::default(),
926            &config,
927            None,
928            resources_instanciator,
929            tasks_instanciator,
930            monitor_instanciator,
931            bridges_instanciator,
932            FakeWriter {},
933            FakeWriter {},
934        )
935        .unwrap();
936
937        // Now emulates the generated runtime
938        {
939            let copperlists = &mut runtime.copperlists_manager;
940            let culist0 = copperlists
941                .inner
942                .create()
943                .expect("Ran out of space for copper lists");
944            // FIXME: error handling.
945            let id = culist0.id;
946            assert_eq!(id, 0);
947            culist0.change_state(CopperListState::Processing);
948            assert_eq!(copperlists.available_copper_lists(), 1);
949        }
950
951        {
952            let copperlists = &mut runtime.copperlists_manager;
953            let culist1 = copperlists
954                .inner
955                .create()
956                .expect("Ran out of space for copper lists"); // FIXME: error handling.
957            let id = culist1.id;
958            assert_eq!(id, 1);
959            culist1.change_state(CopperListState::Processing);
960            assert_eq!(copperlists.available_copper_lists(), 0);
961        }
962
963        {
964            let copperlists = &mut runtime.copperlists_manager;
965            let culist2 = copperlists.inner.create();
966            assert!(culist2.is_none());
967            assert_eq!(copperlists.available_copper_lists(), 0);
968            // Free in order, should let the top of the stack be serialized and freed.
969            let _ = copperlists.end_of_processing(1);
970            assert_eq!(copperlists.available_copper_lists(), 1);
971        }
972
973        // Readd a CL
974        {
975            let copperlists = &mut runtime.copperlists_manager;
976            let culist2 = copperlists
977                .inner
978                .create()
979                .expect("Ran out of space for copper lists"); // FIXME: error handling.
980            let id = culist2.id;
981            assert_eq!(id, 2);
982            culist2.change_state(CopperListState::Processing);
983            assert_eq!(copperlists.available_copper_lists(), 0);
984            // Free out of order, the #0 first
985            let _ = copperlists.end_of_processing(0);
986            // Should not free up the top of the stack
987            assert_eq!(copperlists.available_copper_lists(), 0);
988
989            // Free up the top of the stack
990            let _ = copperlists.end_of_processing(2);
991            // This should free up 2 CLs
992
993            assert_eq!(copperlists.available_copper_lists(), 2);
994        }
995    }
996
997    #[test]
998    fn test_runtime_task_input_order() {
999        let mut config = CuConfig::default();
1000        let graph = config.get_graph_mut(None).unwrap();
1001        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
1002        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
1003        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
1004
1005        assert_eq!(src1_id, 0);
1006        assert_eq!(src2_id, 1);
1007
1008        // note that the source2 connection is before the source1
1009        let src1_type = "src1_type";
1010        let src2_type = "src2_type";
1011        graph.connect(src2_id, sink_id, src2_type).unwrap();
1012        graph.connect(src1_id, sink_id, src1_type).unwrap();
1013
1014        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
1015        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
1016        // the edge id depends on the order the connection is created, not
1017        // on the node id, and that is what determines the input order
1018        assert_eq!(src1_edge_id, 1);
1019        assert_eq!(src2_edge_id, 0);
1020
1021        let runtime = compute_runtime_plan(graph).unwrap();
1022        let sink_step = runtime
1023            .steps
1024            .iter()
1025            .find_map(|step| match step {
1026                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1027                _ => None,
1028            })
1029            .unwrap();
1030
1031        // since the src2 connection was added before src1 connection, the src2 type should be
1032        // first
1033        assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
1034        assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
1035    }
1036
1037    #[test]
1038    fn test_runtime_plan_diamond_case1() {
1039        // more complex topology that tripped the scheduler
1040        let mut config = CuConfig::default();
1041        let graph = config.get_graph_mut(None).unwrap();
1042        let cam0_id = graph
1043            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1044            .unwrap();
1045        let inf0_id = graph
1046            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1047            .unwrap();
1048        let broadcast_id = graph
1049            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1050            .unwrap();
1051
1052        // case 1 order
1053        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1054        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1055        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1056
1057        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1058        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1059
1060        assert_eq!(edge_cam0_to_inf0, 0);
1061        assert_eq!(edge_cam0_to_broadcast, 1);
1062
1063        let runtime = compute_runtime_plan(graph).unwrap();
1064        let broadcast_step = runtime
1065            .steps
1066            .iter()
1067            .find_map(|step| match step {
1068                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1069                _ => None,
1070            })
1071            .unwrap();
1072
1073        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1074        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1075    }
1076
1077    #[test]
1078    fn test_runtime_plan_diamond_case2() {
1079        // more complex topology that tripped the scheduler variation 2
1080        let mut config = CuConfig::default();
1081        let graph = config.get_graph_mut(None).unwrap();
1082        let cam0_id = graph
1083            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1084            .unwrap();
1085        let inf0_id = graph
1086            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1087            .unwrap();
1088        let broadcast_id = graph
1089            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1090            .unwrap();
1091
1092        // case 2 order
1093        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1094        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1095        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1096
1097        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1098        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1099
1100        assert_eq!(edge_cam0_to_broadcast, 0);
1101        assert_eq!(edge_cam0_to_inf0, 1);
1102
1103        let runtime = compute_runtime_plan(graph).unwrap();
1104        let broadcast_step = runtime
1105            .steps
1106            .iter()
1107            .find_map(|step| match step {
1108                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1109                _ => None,
1110            })
1111            .unwrap();
1112
1113        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1114        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1115    }
1116}