cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{CuMonitor, build_monitor_topology};
10use crate::resource::ResourceManager;
11use cu29_clock::{ClockProvider, CuTime, RobotClock};
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15
16#[cfg(target_os = "none")]
17#[allow(unused_imports)]
18use cu29_log::{ANONYMOUS, CuLogEntry, CuLogLevel};
19#[cfg(target_os = "none")]
20#[allow(unused_imports)]
21use cu29_log_derive::info;
22#[cfg(target_os = "none")]
23#[allow(unused_imports)]
24use cu29_log_runtime::log;
25#[cfg(all(target_os = "none", debug_assertions))]
26#[allow(unused_imports)]
27use cu29_log_runtime::log_debug_mode;
28#[cfg(target_os = "none")]
29#[allow(unused_imports)]
30use cu29_value::to_value;
31
32use alloc::boxed::Box;
33use alloc::collections::VecDeque;
34use alloc::format;
35use alloc::string::{String, ToString};
36use alloc::vec::Vec;
37use bincode::enc::EncoderImpl;
38use bincode::enc::write::{SizeWriter, SliceWriter};
39use bincode::error::EncodeError;
40use bincode::{Decode, Encode};
41use core::fmt::Result as FmtResult;
42use core::fmt::{Debug, Formatter};
43
44#[cfg(feature = "std")]
45use cu29_log_runtime::LoggerRuntime;
46#[cfg(feature = "std")]
47use cu29_unifiedlog::UnifiedLoggerWrite;
48#[cfg(feature = "std")]
49use std::sync::{Arc, Mutex};
50
51/// Just a simple struct to hold the various bits needed to run a Copper application.
52#[cfg(feature = "std")]
53pub struct CopperContext {
54    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
55    pub logger_runtime: LoggerRuntime,
56    pub clock: RobotClock,
57}
58
59/// Manages the lifecycle of the copper lists and logging.
60pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
61    pub inner: CuListsManager<P, NBCL>,
62    /// Logger for the copper lists (messages between tasks)
63    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
64}
65
66impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
67    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
68        let mut is_top = true;
69        let mut nb_done = 0;
70        for cl in self.inner.iter_mut() {
71            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
72                cl.change_state(CopperListState::DoneProcessing);
73            }
74            if is_top && cl.get_state() == CopperListState::DoneProcessing {
75                if let Some(logger) = &mut self.logger {
76                    cl.change_state(CopperListState::BeingSerialized);
77                    logger.log(cl)?;
78                }
79                cl.change_state(CopperListState::Free);
80                nb_done += 1;
81            } else {
82                is_top = false;
83            }
84        }
85        for _ in 0..nb_done {
86            let _ = self.inner.pop();
87        }
88        Ok(())
89    }
90
91    pub fn available_copper_lists(&self) -> usize {
92        NBCL - self.inner.len()
93    }
94}
95
96/// Manages the frozen tasks state and logging.
97pub struct KeyFramesManager {
98    /// Where the serialized tasks are stored following the wave of execution of a CL.
99    inner: KeyFrame,
100
101    /// Logger for the state of the tasks (frozen tasks)
102    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
103
104    /// Capture a keyframe only each...
105    keyframe_interval: u32,
106}
107
108impl KeyFramesManager {
109    fn is_keyframe(&self, culistid: u32) -> bool {
110        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
111    }
112
113    pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
114        if self.is_keyframe(culistid) {
115            self.inner.reset(culistid, clock.now());
116        }
117    }
118
119    pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
120        if self.is_keyframe(culistid) {
121            if self.inner.culistid != culistid {
122                panic!("Freezing task for a different culistid");
123            }
124            self.inner
125                .add_frozen_task(task)
126                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
127        } else {
128            Ok(0)
129        }
130    }
131
132    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
133        if self.is_keyframe(culistid) {
134            let logger = self.logger.as_mut().unwrap();
135            logger.log(&self.inner)
136        } else {
137            Ok(())
138        }
139    }
140}
141
142/// This is the main structure that will be injected as a member of the Application struct.
143/// CT is the tuple of all the tasks in order of execution.
144/// CL is the type of the copper list, representing the input/output messages for all the tasks.
145pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
146    /// The base clock the runtime will be using to record time.
147    pub clock: RobotClock, // TODO: remove public at some point
148
149    /// The tuple of all the tasks in order of execution.
150    pub tasks: CT,
151
152    /// Tuple of all instantiated bridges.
153    pub bridges: CB,
154
155    /// Resource registry kept alive for tasks borrowing shared handles.
156    pub resources: ResourceManager,
157
158    /// The runtime monitoring.
159    pub monitor: M,
160
161    /// The logger for the copper lists (messages between tasks)
162    pub copperlists_manager: CopperListsManager<P, NBCL>,
163
164    /// The logger for the state of the tasks (frozen tasks)
165    pub keyframes_manager: KeyFramesManager,
166
167    /// The runtime configuration controlling the behavior of the run loop
168    pub runtime_config: RuntimeConfig,
169}
170
171/// To be able to share the clock we make the runtime a clock provider.
172impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
173    ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
174{
175    fn get_clock(&self) -> RobotClock {
176        self.clock.clone()
177    }
178}
179
180/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
181/// It is a double encapsulation: this one recording the culistid and another even in
182/// bincode in the serialized_tasks.
183#[derive(Encode, Decode)]
184pub struct KeyFrame {
185    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
186    pub culistid: u32,
187    // This is the timestamp when the keyframe was created, using the robot clock.
188    pub timestamp: CuTime,
189    // This is the bincode representation of the tuple of all the tasks.
190    pub serialized_tasks: Vec<u8>,
191}
192
193impl KeyFrame {
194    fn new() -> Self {
195        KeyFrame {
196            culistid: 0,
197            timestamp: CuTime::default(),
198            serialized_tasks: Vec::new(),
199        }
200    }
201
202    /// This is to be able to avoid reallocations
203    fn reset(&mut self, culistid: u32, timestamp: CuTime) {
204        self.culistid = culistid;
205        self.timestamp = timestamp;
206        self.serialized_tasks.clear();
207    }
208
209    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
210    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
211        let cfg = bincode::config::standard();
212        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
213        BincodeAdapter(task).encode(&mut sizer)?;
214        let need = sizer.into_writer().bytes_written as usize;
215
216        let start = self.serialized_tasks.len();
217        self.serialized_tasks.resize(start + need, 0);
218        let mut enc =
219            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
220        BincodeAdapter(task).encode(&mut enc)?;
221        Ok(need)
222    }
223}
224
225impl<
226    CT,
227    CB,
228    P: CopperListTuple + CuListZeroedInit + Default + 'static,
229    M: CuMonitor,
230    const NBCL: usize,
231> CuRuntime<CT, CB, P, M, NBCL>
232{
233    // FIXME(gbin): this became REALLY ugly with no-std
234    #[allow(clippy::too_many_arguments)]
235    #[cfg(feature = "std")]
236    pub fn new(
237        clock: RobotClock,
238        config: &CuConfig,
239        mission: Option<&str>,
240        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
241        tasks_instanciator: impl for<'c> Fn(
242            Vec<Option<&'c ComponentConfig>>,
243            &mut ResourceManager,
244        ) -> CuResult<CT>,
245        monitor_instanciator: impl Fn(&CuConfig) -> M,
246        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
247        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
248        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
249    ) -> CuResult<Self> {
250        let resources = resources_instanciator(config)?;
251        Self::new_with_resources(
252            clock,
253            config,
254            mission,
255            resources,
256            tasks_instanciator,
257            monitor_instanciator,
258            bridges_instanciator,
259            copperlists_logger,
260            keyframes_logger,
261        )
262    }
263
264    #[allow(clippy::too_many_arguments)]
265    #[cfg(feature = "std")]
266    pub fn new_with_resources(
267        clock: RobotClock,
268        config: &CuConfig,
269        mission: Option<&str>,
270        mut resources: ResourceManager,
271        tasks_instanciator: impl for<'c> Fn(
272            Vec<Option<&'c ComponentConfig>>,
273            &mut ResourceManager,
274        ) -> CuResult<CT>,
275        monitor_instanciator: impl Fn(&CuConfig) -> M,
276        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
277        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
278        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
279    ) -> CuResult<Self> {
280        let graph = config.get_graph(mission)?;
281        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
282            .get_all_nodes()
283            .iter()
284            .map(|(_, node)| node.get_instance_config())
285            .collect();
286
287        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
288        let mut monitor = monitor_instanciator(config);
289        if let Ok(topology) = build_monitor_topology(config, mission) {
290            monitor.set_topology(topology);
291        }
292        let bridges = bridges_instanciator(config, &mut resources)?;
293
294        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
295            Some(logging_config) if logging_config.enable_task_logging => (
296                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
297                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
298                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
299            ),
300            Some(_) => (None, None, 0), // explicit no enable logging
301            None => (
302                // default
303                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
304                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
305                DEFAULT_KEYFRAME_INTERVAL,
306            ),
307        };
308
309        let copperlists_manager = CopperListsManager {
310            inner: CuListsManager::new(),
311            logger: copperlists_logger,
312        };
313        #[cfg(target_os = "none")]
314        {
315            let cl_size = core::mem::size_of::<CopperList<P>>();
316            let total_bytes = cl_size.saturating_mul(NBCL);
317            info!(
318                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
319                NBCL, cl_size, total_bytes
320            );
321        }
322
323        let keyframes_manager = KeyFramesManager {
324            inner: KeyFrame::new(),
325            logger: keyframes_logger,
326            keyframe_interval,
327        };
328
329        let runtime_config = config.runtime.clone().unwrap_or_default();
330
331        let runtime = Self {
332            tasks,
333            bridges,
334            resources,
335            monitor,
336            clock,
337            copperlists_manager,
338            keyframes_manager,
339            runtime_config,
340        };
341
342        Ok(runtime)
343    }
344
345    #[allow(clippy::too_many_arguments)]
346    #[cfg(not(feature = "std"))]
347    pub fn new(
348        clock: RobotClock,
349        config: &CuConfig,
350        mission: Option<&str>,
351        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
352        tasks_instanciator: impl for<'c> Fn(
353            Vec<Option<&'c ComponentConfig>>,
354            &mut ResourceManager,
355        ) -> CuResult<CT>,
356        monitor_instanciator: impl Fn(&CuConfig) -> M,
357        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
358        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
359        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
360    ) -> CuResult<Self> {
361        #[cfg(target_os = "none")]
362        info!("CuRuntime::new: resources instanciator");
363        let resources = resources_instanciator(config)?;
364        Self::new_with_resources(
365            clock,
366            config,
367            mission,
368            resources,
369            tasks_instanciator,
370            monitor_instanciator,
371            bridges_instanciator,
372            copperlists_logger,
373            keyframes_logger,
374        )
375    }
376
377    #[allow(clippy::too_many_arguments)]
378    #[cfg(not(feature = "std"))]
379    pub fn new_with_resources(
380        clock: RobotClock,
381        config: &CuConfig,
382        mission: Option<&str>,
383        mut resources: ResourceManager,
384        tasks_instanciator: impl for<'c> Fn(
385            Vec<Option<&'c ComponentConfig>>,
386            &mut ResourceManager,
387        ) -> CuResult<CT>,
388        monitor_instanciator: impl Fn(&CuConfig) -> M,
389        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
390        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
391        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
392    ) -> CuResult<Self> {
393        #[cfg(target_os = "none")]
394        info!("CuRuntime::new: get graph");
395        let graph = config.get_graph(mission)?;
396        #[cfg(target_os = "none")]
397        info!("CuRuntime::new: graph ok");
398        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
399            .get_all_nodes()
400            .iter()
401            .map(|(_, node)| node.get_instance_config())
402            .collect();
403
404        #[cfg(target_os = "none")]
405        info!("CuRuntime::new: tasks instanciator");
406        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
407
408        #[cfg(target_os = "none")]
409        info!("CuRuntime::new: monitor instanciator");
410        let mut monitor = monitor_instanciator(config);
411        #[cfg(target_os = "none")]
412        info!("CuRuntime::new: monitor instanciator ok");
413        #[cfg(target_os = "none")]
414        info!("CuRuntime::new: build monitor topology");
415        if let Ok(topology) = build_monitor_topology(config, mission) {
416            #[cfg(target_os = "none")]
417            info!("CuRuntime::new: monitor topology ok");
418            monitor.set_topology(topology);
419            #[cfg(target_os = "none")]
420            info!("CuRuntime::new: monitor topology set");
421        }
422        #[cfg(target_os = "none")]
423        info!("CuRuntime::new: bridges instanciator");
424        let bridges = bridges_instanciator(config, &mut resources)?;
425
426        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
427            Some(logging_config) if logging_config.enable_task_logging => (
428                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
429                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
430                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
431            ),
432            Some(_) => (None, None, 0), // explicit no enable logging
433            None => (
434                // default
435                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
436                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
437                DEFAULT_KEYFRAME_INTERVAL,
438            ),
439        };
440
441        let copperlists_manager = CopperListsManager {
442            inner: CuListsManager::new(),
443            logger: copperlists_logger,
444        };
445        #[cfg(target_os = "none")]
446        {
447            let cl_size = core::mem::size_of::<CopperList<P>>();
448            let total_bytes = cl_size.saturating_mul(NBCL);
449            info!(
450                "CuRuntime::new: copperlists count={} cl_size={} total_bytes={}",
451                NBCL, cl_size, total_bytes
452            );
453        }
454
455        let keyframes_manager = KeyFramesManager {
456            inner: KeyFrame::new(),
457            logger: keyframes_logger,
458            keyframe_interval,
459        };
460
461        let runtime_config = config.runtime.clone().unwrap_or_default();
462
463        let runtime = Self {
464            tasks,
465            bridges,
466            resources,
467            monitor,
468            clock,
469            copperlists_manager,
470            keyframes_manager,
471            runtime_config,
472        };
473
474        Ok(runtime)
475    }
476}
477
478/// Copper tasks can be of 3 types:
479/// - Source: only producing output messages (usually used for drivers)
480/// - Regular: processing input messages and producing output messages, more like compute nodes.
481/// - Sink: only consuming input messages (usually used for actuators)
482#[derive(Debug, PartialEq, Eq, Clone, Copy)]
483pub enum CuTaskType {
484    Source,
485    Regular,
486    Sink,
487}
488
489/// This structure represents a step in the execution plan.
490pub struct CuExecutionStep {
491    /// NodeId: node id of the task to execute
492    pub node_id: NodeId,
493    /// Node: node instance
494    pub node: Node,
495    /// CuTaskType: type of the task
496    pub task_type: CuTaskType,
497
498    /// the indices in the copper list of the input messages and their types
499    pub input_msg_indices_types: Vec<(u32, String)>,
500
501    /// the index in the copper list of the output message and its type
502    pub output_msg_index_type: Option<(u32, String)>,
503}
504
505impl Debug for CuExecutionStep {
506    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
507        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
508        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
509        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
510        f.write_str(
511            format!(
512                "              input_msg_types: {:?}\n",
513                self.input_msg_indices_types
514            )
515            .as_str(),
516        )?;
517        f.write_str(
518            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
519        )?;
520        Ok(())
521    }
522}
523
524/// This structure represents a loop in the execution plan.
525/// It is used to represent a sequence of Execution units (loop or steps) that are executed
526/// multiple times.
527/// if loop_count is None, the loop is infinite.
528pub struct CuExecutionLoop {
529    pub steps: Vec<CuExecutionUnit>,
530    pub loop_count: Option<u32>,
531}
532
533impl Debug for CuExecutionLoop {
534    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
535        f.write_str("CuExecutionLoop:\n")?;
536        for step in &self.steps {
537            match step {
538                CuExecutionUnit::Step(step) => {
539                    step.fmt(f)?;
540                }
541                CuExecutionUnit::Loop(l) => {
542                    l.fmt(f)?;
543                }
544            }
545        }
546
547        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
548        Ok(())
549    }
550}
551
552/// This structure represents a step in the execution plan.
553#[derive(Debug)]
554pub enum CuExecutionUnit {
555    Step(CuExecutionStep),
556    Loop(CuExecutionLoop),
557}
558
559fn find_output_index_type_from_nodeid(
560    node_id: NodeId,
561    steps: &Vec<CuExecutionUnit>,
562) -> Option<(u32, String)> {
563    for step in steps {
564        match step {
565            CuExecutionUnit::Loop(loop_unit) => {
566                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
567                    return Some(index);
568                }
569            }
570            CuExecutionUnit::Step(step) => {
571                if step.node_id == node_id {
572                    return step.output_msg_index_type.clone();
573                }
574            }
575        }
576    }
577    None
578}
579
580pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
581    if graph.incoming_neighbor_count(node_id) == 0 {
582        CuTaskType::Source
583    } else if graph.outgoing_neighbor_count(node_id) == 0 {
584        CuTaskType::Sink
585    } else {
586        CuTaskType::Regular
587    }
588}
589
590/// This function gets the input node by using the input step plan id, to get the edge that
591/// connects the input to the output in the config graph
592fn find_edge_with_plan_input_id(
593    plan: &[CuExecutionUnit],
594    graph: &CuGraph,
595    plan_id: u32,
596    output_node_id: NodeId,
597) -> usize {
598    let input_node = plan
599        .get(plan_id as usize)
600        .expect("Input step should've been added to plan before the step that receives the input");
601    let CuExecutionUnit::Step(input_step) = input_node else {
602        panic!("Expected input to be from a step, not a loop");
603    };
604    let input_node_id = input_step.node_id;
605
606    graph
607        .edge_id_between(input_node_id, output_node_id)
608        .expect("An edge connecting the input to the output should exist")
609}
610
611/// The connection id used here is the index of the config graph edge that equates to the wanted
612/// connection
613fn sort_inputs_by_cnx_id(
614    input_msg_indices_types: &mut [(u32, String)],
615    plan: &[CuExecutionUnit],
616    graph: &CuGraph,
617    curr_node_id: NodeId,
618) {
619    input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
620        let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
621        let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
622        a_edge_id.cmp(&b_edge_id)
623    });
624}
625/// Explores a subbranch and build the partial plan out of it.
626fn plan_tasks_tree_branch(
627    graph: &CuGraph,
628    mut next_culist_output_index: u32,
629    starting_point: NodeId,
630    plan: &mut Vec<CuExecutionUnit>,
631) -> (u32, bool) {
632    #[cfg(all(feature = "std", feature = "macro_debug"))]
633    eprintln!("-- starting branch from node {starting_point}");
634
635    let mut handled = false;
636
637    for id in graph.bfs_nodes(starting_point) {
638        let node_ref = graph.get_node(id).unwrap();
639        #[cfg(all(feature = "std", feature = "macro_debug"))]
640        eprintln!("  Visiting node: {node_ref:?}");
641
642        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
643        let output_msg_index_type: Option<(u32, String)>;
644        let task_type = find_task_type_for_id(graph, id);
645
646        match task_type {
647            CuTaskType::Source => {
648                #[cfg(all(feature = "std", feature = "macro_debug"))]
649                eprintln!("    → Source node, assign output index {next_culist_output_index}");
650                let edge_id = graph.get_src_edges(id).unwrap()[0];
651                output_msg_index_type = Some((
652                    next_culist_output_index,
653                    graph
654                        .edge(edge_id)
655                        .unwrap() // FIXME(gbin): Error handling
656                        .msg
657                        .clone(),
658                ));
659                next_culist_output_index += 1;
660            }
661            CuTaskType::Sink => {
662                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
663                #[cfg(all(feature = "std", feature = "macro_debug"))]
664                eprintln!("    → Sink with parents: {parents:?}");
665                for parent in parents {
666                    let pid = parent;
667                    let index_type = find_output_index_type_from_nodeid(pid, plan);
668                    if let Some(index_type) = index_type {
669                        #[cfg(all(feature = "std", feature = "macro_debug"))]
670                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
671                        input_msg_indices_types.push(index_type);
672                    } else {
673                        #[cfg(all(feature = "std", feature = "macro_debug"))]
674                        eprintln!("      ✗ Input from {pid} not ready, returning");
675                        return (next_culist_output_index, handled);
676                    }
677                }
678                output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
679                next_culist_output_index += 1;
680            }
681            CuTaskType::Regular => {
682                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
683                #[cfg(all(feature = "std", feature = "macro_debug"))]
684                eprintln!("    → Regular task with parents: {parents:?}");
685                for parent in parents {
686                    let pid = parent;
687                    let index_type = find_output_index_type_from_nodeid(pid, plan);
688                    if let Some(index_type) = index_type {
689                        #[cfg(all(feature = "std", feature = "macro_debug"))]
690                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
691                        input_msg_indices_types.push(index_type);
692                    } else {
693                        #[cfg(all(feature = "std", feature = "macro_debug"))]
694                        eprintln!("      ✗ Input from {pid} not ready, returning");
695                        return (next_culist_output_index, handled);
696                    }
697                }
698                let edge_id = graph.get_src_edges(id).unwrap()[0];
699                output_msg_index_type = Some((
700                    next_culist_output_index,
701                    graph
702                        .edge(edge_id) // FIXME(gbin): Error handling and multimission
703                        .unwrap()
704                        .msg
705                        .clone(),
706                ));
707                next_culist_output_index += 1;
708            }
709        }
710
711        sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
712
713        if let Some(pos) = plan
714            .iter()
715            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
716        {
717            #[cfg(all(feature = "std", feature = "macro_debug"))]
718            eprintln!("    → Already in plan, modifying existing step");
719            let mut step = plan.remove(pos);
720            if let CuExecutionUnit::Step(ref mut s) = step {
721                s.input_msg_indices_types = input_msg_indices_types;
722            }
723            plan.push(step);
724        } else {
725            #[cfg(all(feature = "std", feature = "macro_debug"))]
726            eprintln!("    → New step added to plan");
727            let step = CuExecutionStep {
728                node_id: id,
729                node: node_ref.clone(),
730                task_type,
731                input_msg_indices_types,
732                output_msg_index_type,
733            };
734            plan.push(CuExecutionUnit::Step(step));
735        }
736
737        handled = true;
738    }
739
740    #[cfg(all(feature = "std", feature = "macro_debug"))]
741    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
742    (next_culist_output_index, handled)
743}
744
745/// This is the main heuristics to compute an execution plan at compilation time.
746/// TODO(gbin): Make that heuristic pluggable.
747pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
748    #[cfg(all(feature = "std", feature = "macro_debug"))]
749    eprintln!("[runtime plan]");
750    let mut plan = Vec::new();
751    let mut next_culist_output_index = 0u32;
752
753    let mut queue: VecDeque<NodeId> = graph
754        .node_ids()
755        .into_iter()
756        .filter(|&node_id| find_task_type_for_id(graph, node_id) == CuTaskType::Source)
757        .collect();
758
759    #[cfg(all(feature = "std", feature = "macro_debug"))]
760    eprintln!("Initial source nodes: {queue:?}");
761
762    while let Some(start_node) = queue.pop_front() {
763        #[cfg(all(feature = "std", feature = "macro_debug"))]
764        eprintln!("→ Starting BFS from source {start_node}");
765        for node_id in graph.bfs_nodes(start_node) {
766            let already_in_plan = plan
767                .iter()
768                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
769            if already_in_plan {
770                #[cfg(all(feature = "std", feature = "macro_debug"))]
771                eprintln!("    → Node {node_id} already planned, skipping");
772                continue;
773            }
774
775            #[cfg(all(feature = "std", feature = "macro_debug"))]
776            eprintln!("    Planning from node {node_id}");
777            let (new_index, handled) =
778                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
779            next_culist_output_index = new_index;
780
781            if !handled {
782                #[cfg(all(feature = "std", feature = "macro_debug"))]
783                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
784                continue;
785            }
786
787            #[cfg(all(feature = "std", feature = "macro_debug"))]
788            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
789            for neighbor in graph.get_neighbor_ids(node_id, CuDirection::Outgoing) {
790                #[cfg(all(feature = "std", feature = "macro_debug"))]
791                eprintln!("      → Enqueueing neighbor {neighbor}");
792                queue.push_back(neighbor);
793            }
794        }
795    }
796
797    Ok(CuExecutionLoop {
798        steps: plan,
799        loop_count: None,
800    })
801}
802
803//tests
804#[cfg(test)]
805mod tests {
806    use super::*;
807    use crate::config::Node;
808    use crate::cutask::CuSinkTask;
809    use crate::cutask::{CuSrcTask, Freezable};
810    use crate::monitoring::NoMonitor;
811    use bincode::Encode;
812    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
813    use serde_derive::Serialize;
814
815    pub struct TestSource {}
816
817    impl Freezable for TestSource {}
818
819    impl CuSrcTask for TestSource {
820        type Resources<'r> = ();
821        type Output<'m> = ();
822        fn new_with(
823            _config: Option<&ComponentConfig>,
824            _resources: Self::Resources<'_>,
825        ) -> CuResult<Self>
826        where
827            Self: Sized,
828        {
829            Ok(Self {})
830        }
831
832        fn process(
833            &mut self,
834            _clock: &RobotClock,
835            _empty_msg: &mut Self::Output<'_>,
836        ) -> CuResult<()> {
837            Ok(())
838        }
839    }
840
841    pub struct TestSink {}
842
843    impl Freezable for TestSink {}
844
845    impl CuSinkTask for TestSink {
846        type Resources<'r> = ();
847        type Input<'m> = ();
848
849        fn new_with(
850            _config: Option<&ComponentConfig>,
851            _resources: Self::Resources<'_>,
852        ) -> CuResult<Self>
853        where
854            Self: Sized,
855        {
856            Ok(Self {})
857        }
858
859        fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
860            Ok(())
861        }
862    }
863
864    // Those should be generated by the derive macro
865    type Tasks = (TestSource, TestSink);
866
867    #[derive(Debug, Encode, Decode, Serialize, Default)]
868    struct Msgs(());
869
870    impl ErasedCuStampedDataSet for Msgs {
871        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
872            Vec::new()
873        }
874    }
875
876    impl MatchingTasks for Msgs {
877        fn get_all_task_ids() -> &'static [&'static str] {
878            &[]
879        }
880    }
881
882    impl CuListZeroedInit for Msgs {
883        fn init_zeroed(&mut self) {}
884    }
885
886    #[cfg(feature = "std")]
887    fn tasks_instanciator(
888        all_instances_configs: Vec<Option<&ComponentConfig>>,
889        _resources: &mut ResourceManager,
890    ) -> CuResult<Tasks> {
891        Ok((
892            TestSource::new(all_instances_configs[0])?,
893            TestSink::new(all_instances_configs[1])?,
894        ))
895    }
896
897    #[cfg(not(feature = "std"))]
898    fn tasks_instanciator(
899        all_instances_configs: Vec<Option<&ComponentConfig>>,
900        _resources: &mut ResourceManager,
901    ) -> CuResult<Tasks> {
902        Ok((
903            TestSource::new(all_instances_configs[0])?,
904            TestSink::new(all_instances_configs[1])?,
905        ))
906    }
907
908    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
909        NoMonitor {}
910    }
911
912    fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
913        Ok(())
914    }
915
916    fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
917        Ok(ResourceManager::new(&[]))
918    }
919
920    #[derive(Debug)]
921    struct FakeWriter {}
922
923    impl<E: Encode> WriteStream<E> for FakeWriter {
924        fn log(&mut self, _obj: &E) -> CuResult<()> {
925            Ok(())
926        }
927    }
928
929    #[test]
930    fn test_runtime_instantiation() {
931        let mut config = CuConfig::default();
932        let graph = config.get_graph_mut(None).unwrap();
933        graph.add_node(Node::new("a", "TestSource")).unwrap();
934        graph.add_node(Node::new("b", "TestSink")).unwrap();
935        graph.connect(0, 1, "()").unwrap();
936        let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
937            RobotClock::default(),
938            &config,
939            None,
940            resources_instanciator,
941            tasks_instanciator,
942            monitor_instanciator,
943            bridges_instanciator,
944            FakeWriter {},
945            FakeWriter {},
946        );
947        assert!(runtime.is_ok());
948    }
949
950    #[test]
951    fn test_copperlists_manager_lifecycle() {
952        let mut config = CuConfig::default();
953        let graph = config.get_graph_mut(None).unwrap();
954        graph.add_node(Node::new("a", "TestSource")).unwrap();
955        graph.add_node(Node::new("b", "TestSink")).unwrap();
956        graph.connect(0, 1, "()").unwrap();
957
958        let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
959            RobotClock::default(),
960            &config,
961            None,
962            resources_instanciator,
963            tasks_instanciator,
964            monitor_instanciator,
965            bridges_instanciator,
966            FakeWriter {},
967            FakeWriter {},
968        )
969        .unwrap();
970
971        // Now emulates the generated runtime
972        {
973            let copperlists = &mut runtime.copperlists_manager;
974            let culist0 = copperlists
975                .inner
976                .create()
977                .expect("Ran out of space for copper lists");
978            // FIXME: error handling.
979            let id = culist0.id;
980            assert_eq!(id, 0);
981            culist0.change_state(CopperListState::Processing);
982            assert_eq!(copperlists.available_copper_lists(), 1);
983        }
984
985        {
986            let copperlists = &mut runtime.copperlists_manager;
987            let culist1 = copperlists
988                .inner
989                .create()
990                .expect("Ran out of space for copper lists"); // FIXME: error handling.
991            let id = culist1.id;
992            assert_eq!(id, 1);
993            culist1.change_state(CopperListState::Processing);
994            assert_eq!(copperlists.available_copper_lists(), 0);
995        }
996
997        {
998            let copperlists = &mut runtime.copperlists_manager;
999            let culist2 = copperlists.inner.create();
1000            assert!(culist2.is_none());
1001            assert_eq!(copperlists.available_copper_lists(), 0);
1002            // Free in order, should let the top of the stack be serialized and freed.
1003            let _ = copperlists.end_of_processing(1);
1004            assert_eq!(copperlists.available_copper_lists(), 1);
1005        }
1006
1007        // Readd a CL
1008        {
1009            let copperlists = &mut runtime.copperlists_manager;
1010            let culist2 = copperlists
1011                .inner
1012                .create()
1013                .expect("Ran out of space for copper lists"); // FIXME: error handling.
1014            let id = culist2.id;
1015            assert_eq!(id, 2);
1016            culist2.change_state(CopperListState::Processing);
1017            assert_eq!(copperlists.available_copper_lists(), 0);
1018            // Free out of order, the #0 first
1019            let _ = copperlists.end_of_processing(0);
1020            // Should not free up the top of the stack
1021            assert_eq!(copperlists.available_copper_lists(), 0);
1022
1023            // Free up the top of the stack
1024            let _ = copperlists.end_of_processing(2);
1025            // This should free up 2 CLs
1026
1027            assert_eq!(copperlists.available_copper_lists(), 2);
1028        }
1029    }
1030
1031    #[test]
1032    fn test_runtime_task_input_order() {
1033        let mut config = CuConfig::default();
1034        let graph = config.get_graph_mut(None).unwrap();
1035        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
1036        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
1037        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
1038
1039        assert_eq!(src1_id, 0);
1040        assert_eq!(src2_id, 1);
1041
1042        // note that the source2 connection is before the source1
1043        let src1_type = "src1_type";
1044        let src2_type = "src2_type";
1045        graph.connect(src2_id, sink_id, src2_type).unwrap();
1046        graph.connect(src1_id, sink_id, src1_type).unwrap();
1047
1048        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
1049        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
1050        // the edge id depends on the order the connection is created, not
1051        // on the node id, and that is what determines the input order
1052        assert_eq!(src1_edge_id, 1);
1053        assert_eq!(src2_edge_id, 0);
1054
1055        let runtime = compute_runtime_plan(graph).unwrap();
1056        let sink_step = runtime
1057            .steps
1058            .iter()
1059            .find_map(|step| match step {
1060                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
1061                _ => None,
1062            })
1063            .unwrap();
1064
1065        // since the src2 connection was added before src1 connection, the src2 type should be
1066        // first
1067        assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
1068        assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
1069    }
1070
1071    #[test]
1072    fn test_runtime_plan_diamond_case1() {
1073        // more complex topology that tripped the scheduler
1074        let mut config = CuConfig::default();
1075        let graph = config.get_graph_mut(None).unwrap();
1076        let cam0_id = graph
1077            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1078            .unwrap();
1079        let inf0_id = graph
1080            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1081            .unwrap();
1082        let broadcast_id = graph
1083            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1084            .unwrap();
1085
1086        // case 1 order
1087        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1088        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1089        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1090
1091        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1092        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1093
1094        assert_eq!(edge_cam0_to_inf0, 0);
1095        assert_eq!(edge_cam0_to_broadcast, 1);
1096
1097        let runtime = compute_runtime_plan(graph).unwrap();
1098        let broadcast_step = runtime
1099            .steps
1100            .iter()
1101            .find_map(|step| match step {
1102                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1103                _ => None,
1104            })
1105            .unwrap();
1106
1107        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1108        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1109    }
1110
1111    #[test]
1112    fn test_runtime_plan_diamond_case2() {
1113        // more complex topology that tripped the scheduler variation 2
1114        let mut config = CuConfig::default();
1115        let graph = config.get_graph_mut(None).unwrap();
1116        let cam0_id = graph
1117            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1118            .unwrap();
1119        let inf0_id = graph
1120            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1121            .unwrap();
1122        let broadcast_id = graph
1123            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1124            .unwrap();
1125
1126        // case 2 order
1127        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1128        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1129        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1130
1131        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1132        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1133
1134        assert_eq!(edge_cam0_to_broadcast, 0);
1135        assert_eq!(edge_cam0_to_inf0, 1);
1136
1137        let runtime = compute_runtime_plan(graph).unwrap();
1138        let broadcast_step = runtime
1139            .steps
1140            .iter()
1141            .find_map(|step| match step {
1142                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1143                _ => None,
1144            })
1145            .unwrap();
1146
1147        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1148        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1149    }
1150}