cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, CuDirection, DEFAULT_KEYFRAME_INTERVAL, Node};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::{CuMonitor, build_monitor_topology};
10use crate::resource::ResourceManager;
11use cu29_clock::{ClockProvider, CuTime, RobotClock};
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15
16use alloc::boxed::Box;
17use alloc::collections::VecDeque;
18use alloc::format;
19use alloc::string::{String, ToString};
20use alloc::vec::Vec;
21use bincode::enc::EncoderImpl;
22use bincode::enc::write::{SizeWriter, SliceWriter};
23use bincode::error::EncodeError;
24use bincode::{Decode, Encode};
25use core::fmt::Result as FmtResult;
26use core::fmt::{Debug, Formatter};
27use petgraph::prelude::*;
28use petgraph::visit::VisitMap;
29use petgraph::visit::Visitable;
30
31#[cfg(feature = "std")]
32use cu29_log_runtime::LoggerRuntime;
33#[cfg(feature = "std")]
34use cu29_unifiedlog::UnifiedLoggerWrite;
35#[cfg(feature = "std")]
36use std::sync::{Arc, Mutex};
37
38/// Just a simple struct to hold the various bits needed to run a Copper application.
39#[cfg(feature = "std")]
40pub struct CopperContext {
41    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
42    pub logger_runtime: LoggerRuntime,
43    pub clock: RobotClock,
44}
45
46/// Manages the lifecycle of the copper lists and logging.
47pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
48    pub inner: CuListsManager<P, NBCL>,
49    /// Logger for the copper lists (messages between tasks)
50    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
51}
52
53impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
54    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
55        let mut is_top = true;
56        let mut nb_done = 0;
57        for cl in self.inner.iter_mut() {
58            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
59                cl.change_state(CopperListState::DoneProcessing);
60            }
61            if is_top && cl.get_state() == CopperListState::DoneProcessing {
62                if let Some(logger) = &mut self.logger {
63                    cl.change_state(CopperListState::BeingSerialized);
64                    logger.log(cl)?;
65                }
66                cl.change_state(CopperListState::Free);
67                nb_done += 1;
68            } else {
69                is_top = false;
70            }
71        }
72        for _ in 0..nb_done {
73            let _ = self.inner.pop();
74        }
75        Ok(())
76    }
77
78    pub fn available_copper_lists(&self) -> usize {
79        NBCL - self.inner.len()
80    }
81}
82
83/// Manages the frozen tasks state and logging.
84pub struct KeyFramesManager {
85    /// Where the serialized tasks are stored following the wave of execution of a CL.
86    inner: KeyFrame,
87
88    /// Logger for the state of the tasks (frozen tasks)
89    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
90
91    /// Capture a keyframe only each...
92    keyframe_interval: u32,
93}
94
95impl KeyFramesManager {
96    fn is_keyframe(&self, culistid: u32) -> bool {
97        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
98    }
99
100    pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
101        if self.is_keyframe(culistid) {
102            self.inner.reset(culistid, clock.now());
103        }
104    }
105
106    pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
107        if self.is_keyframe(culistid) {
108            if self.inner.culistid != culistid {
109                panic!("Freezing task for a different culistid");
110            }
111            self.inner
112                .add_frozen_task(task)
113                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
114        } else {
115            Ok(0)
116        }
117    }
118
119    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
120        if self.is_keyframe(culistid) {
121            let logger = self.logger.as_mut().unwrap();
122            logger.log(&self.inner)
123        } else {
124            Ok(())
125        }
126    }
127}
128
129/// This is the main structure that will be injected as a member of the Application struct.
130/// CT is the tuple of all the tasks in order of execution.
131/// CL is the type of the copper list, representing the input/output messages for all the tasks.
132pub struct CuRuntime<CT, CB, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
133    /// The base clock the runtime will be using to record time.
134    pub clock: RobotClock, // TODO: remove public at some point
135
136    /// The tuple of all the tasks in order of execution.
137    pub tasks: CT,
138
139    /// Tuple of all instantiated bridges.
140    pub bridges: CB,
141
142    /// Resource registry kept alive for tasks borrowing shared handles.
143    pub resources: ResourceManager,
144
145    /// The runtime monitoring.
146    pub monitor: M,
147
148    /// The logger for the copper lists (messages between tasks)
149    pub copperlists_manager: CopperListsManager<P, NBCL>,
150
151    /// The logger for the state of the tasks (frozen tasks)
152    pub keyframes_manager: KeyFramesManager,
153
154    /// The runtime configuration controlling the behavior of the run loop
155    pub runtime_config: RuntimeConfig,
156}
157
158/// To be able to share the clock we make the runtime a clock provider.
159impl<CT, CB, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
160    ClockProvider for CuRuntime<CT, CB, P, M, NBCL>
161{
162    fn get_clock(&self) -> RobotClock {
163        self.clock.clone()
164    }
165}
166
167/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
168/// It is a double encapsulation: this one recording the culistid and another even in
169/// bincode in the serialized_tasks.
170#[derive(Encode, Decode)]
171pub struct KeyFrame {
172    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
173    pub culistid: u32,
174    // This is the timestamp when the keyframe was created, using the robot clock.
175    pub timestamp: CuTime,
176    // This is the bincode representation of the tuple of all the tasks.
177    pub serialized_tasks: Vec<u8>,
178}
179
180impl KeyFrame {
181    fn new() -> Self {
182        KeyFrame {
183            culistid: 0,
184            timestamp: CuTime::default(),
185            serialized_tasks: Vec::new(),
186        }
187    }
188
189    /// This is to be able to avoid reallocations
190    fn reset(&mut self, culistid: u32, timestamp: CuTime) {
191        self.culistid = culistid;
192        self.timestamp = timestamp;
193        self.serialized_tasks.clear();
194    }
195
196    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
197    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
198        let cfg = bincode::config::standard();
199        let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
200        BincodeAdapter(task).encode(&mut sizer)?;
201        let need = sizer.into_writer().bytes_written as usize;
202
203        let start = self.serialized_tasks.len();
204        self.serialized_tasks.resize(start + need, 0);
205        let mut enc =
206            EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
207        BincodeAdapter(task).encode(&mut enc)?;
208        Ok(need)
209    }
210}
211
212impl<
213    CT,
214    CB,
215    P: CopperListTuple + CuListZeroedInit + Default + 'static,
216    M: CuMonitor,
217    const NBCL: usize,
218> CuRuntime<CT, CB, P, M, NBCL>
219{
220    // FIXME(gbin): this became REALLY ugly with no-std
221    #[allow(clippy::too_many_arguments)]
222    #[cfg(feature = "std")]
223    pub fn new(
224        clock: RobotClock,
225        config: &CuConfig,
226        mission: Option<&str>,
227        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
228        tasks_instanciator: impl for<'c> Fn(
229            Vec<Option<&'c ComponentConfig>>,
230            &mut ResourceManager,
231        ) -> CuResult<CT>,
232        monitor_instanciator: impl Fn(&CuConfig) -> M,
233        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
234        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
235        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
236    ) -> CuResult<Self> {
237        let graph = config.get_graph(mission)?;
238        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
239            .get_all_nodes()
240            .iter()
241            .map(|(_, node)| node.get_instance_config())
242            .collect();
243
244        let mut resources = resources_instanciator(config)?;
245
246        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
247        let mut monitor = monitor_instanciator(config);
248        if let Ok(topology) = build_monitor_topology(config, mission) {
249            monitor.set_topology(topology);
250        }
251        let bridges = bridges_instanciator(config, &mut resources)?;
252
253        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
254            Some(logging_config) if logging_config.enable_task_logging => (
255                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
256                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
257                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
258            ),
259            Some(_) => (None, None, 0), // explicit no enable logging
260            None => (
261                // default
262                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
263                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
264                DEFAULT_KEYFRAME_INTERVAL,
265            ),
266        };
267
268        let copperlists_manager = CopperListsManager {
269            inner: CuListsManager::new(),
270            logger: copperlists_logger,
271        };
272
273        let keyframes_manager = KeyFramesManager {
274            inner: KeyFrame::new(),
275            logger: keyframes_logger,
276            keyframe_interval,
277        };
278
279        let runtime_config = config.runtime.clone().unwrap_or_default();
280
281        let runtime = Self {
282            tasks,
283            bridges,
284            resources,
285            monitor,
286            clock,
287            copperlists_manager,
288            keyframes_manager,
289            runtime_config,
290        };
291
292        Ok(runtime)
293    }
294
295    #[allow(clippy::too_many_arguments)]
296    #[cfg(not(feature = "std"))]
297    pub fn new(
298        clock: RobotClock,
299        config: &CuConfig,
300        mission: Option<&str>,
301        resources_instanciator: impl Fn(&CuConfig) -> CuResult<ResourceManager>,
302        tasks_instanciator: impl for<'c> Fn(
303            Vec<Option<&'c ComponentConfig>>,
304            &mut ResourceManager,
305        ) -> CuResult<CT>,
306        monitor_instanciator: impl Fn(&CuConfig) -> M,
307        bridges_instanciator: impl Fn(&CuConfig, &mut ResourceManager) -> CuResult<CB>,
308        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
309        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
310    ) -> CuResult<Self> {
311        let graph = config.get_graph(mission)?;
312        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
313            .get_all_nodes()
314            .iter()
315            .map(|(_, node)| node.get_instance_config())
316            .collect();
317
318        let mut resources = resources_instanciator(config)?;
319
320        let tasks = tasks_instanciator(all_instances_configs, &mut resources)?;
321
322        let mut monitor = monitor_instanciator(config);
323        if let Ok(topology) = build_monitor_topology(config, mission) {
324            monitor.set_topology(topology);
325        }
326        let bridges = bridges_instanciator(config, &mut resources)?;
327
328        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
329            Some(logging_config) if logging_config.enable_task_logging => (
330                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
331                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
332                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
333            ),
334            Some(_) => (None, None, 0), // explicit no enable logging
335            None => (
336                // default
337                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
338                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
339                DEFAULT_KEYFRAME_INTERVAL,
340            ),
341        };
342
343        let copperlists_manager = CopperListsManager {
344            inner: CuListsManager::new(),
345            logger: copperlists_logger,
346        };
347
348        let keyframes_manager = KeyFramesManager {
349            inner: KeyFrame::new(),
350            logger: keyframes_logger,
351            keyframe_interval,
352        };
353
354        let runtime_config = config.runtime.clone().unwrap_or_default();
355
356        let runtime = Self {
357            tasks,
358            bridges,
359            resources,
360            monitor,
361            clock,
362            copperlists_manager,
363            keyframes_manager,
364            runtime_config,
365        };
366
367        Ok(runtime)
368    }
369}
370
371/// Copper tasks can be of 3 types:
372/// - Source: only producing output messages (usually used for drivers)
373/// - Regular: processing input messages and producing output messages, more like compute nodes.
374/// - Sink: only consuming input messages (usually used for actuators)
375#[derive(Debug, PartialEq, Eq, Clone, Copy)]
376pub enum CuTaskType {
377    Source,
378    Regular,
379    Sink,
380}
381
382/// This structure represents a step in the execution plan.
383pub struct CuExecutionStep {
384    /// NodeId: node id of the task to execute
385    pub node_id: NodeId,
386    /// Node: node instance
387    pub node: Node,
388    /// CuTaskType: type of the task
389    pub task_type: CuTaskType,
390
391    /// the indices in the copper list of the input messages and their types
392    pub input_msg_indices_types: Vec<(u32, String)>,
393
394    /// the index in the copper list of the output message and its type
395    pub output_msg_index_type: Option<(u32, String)>,
396}
397
398impl Debug for CuExecutionStep {
399    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
400        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
401        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
402        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
403        f.write_str(
404            format!(
405                "              input_msg_types: {:?}\n",
406                self.input_msg_indices_types
407            )
408            .as_str(),
409        )?;
410        f.write_str(
411            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
412        )?;
413        Ok(())
414    }
415}
416
417/// This structure represents a loop in the execution plan.
418/// It is used to represent a sequence of Execution units (loop or steps) that are executed
419/// multiple times.
420/// if loop_count is None, the loop is infinite.
421pub struct CuExecutionLoop {
422    pub steps: Vec<CuExecutionUnit>,
423    pub loop_count: Option<u32>,
424}
425
426impl Debug for CuExecutionLoop {
427    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
428        f.write_str("CuExecutionLoop:\n")?;
429        for step in &self.steps {
430            match step {
431                CuExecutionUnit::Step(step) => {
432                    step.fmt(f)?;
433                }
434                CuExecutionUnit::Loop(l) => {
435                    l.fmt(f)?;
436                }
437            }
438        }
439
440        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
441        Ok(())
442    }
443}
444
445/// This structure represents a step in the execution plan.
446#[derive(Debug)]
447pub enum CuExecutionUnit {
448    Step(CuExecutionStep),
449    Loop(CuExecutionLoop),
450}
451
452fn find_output_index_type_from_nodeid(
453    node_id: NodeId,
454    steps: &Vec<CuExecutionUnit>,
455) -> Option<(u32, String)> {
456    for step in steps {
457        match step {
458            CuExecutionUnit::Loop(loop_unit) => {
459                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
460                    return Some(index);
461                }
462            }
463            CuExecutionUnit::Step(step) => {
464                if step.node_id == node_id {
465                    return step.output_msg_index_type.clone();
466                }
467            }
468        }
469    }
470    None
471}
472
473pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
474    if graph.incoming_neighbor_count(node_id) == 0 {
475        CuTaskType::Source
476    } else if graph.outgoing_neighbor_count(node_id) == 0 {
477        CuTaskType::Sink
478    } else {
479        CuTaskType::Regular
480    }
481}
482
483/// This function gets the input node by using the input step plan id, to get the edge that
484/// connects the input to the output in the config graph
485fn find_edge_with_plan_input_id(
486    plan: &[CuExecutionUnit],
487    graph: &CuGraph,
488    plan_id: u32,
489    output_node_id: NodeId,
490) -> usize {
491    let input_node = plan
492        .get(plan_id as usize)
493        .expect("Input step should've been added to plan before the step that receives the input");
494    let CuExecutionUnit::Step(input_step) = input_node else {
495        panic!("Expected input to be from a step, not a loop");
496    };
497    let input_node_id = input_step.node_id;
498
499    graph
500        .0
501        .edges_connecting(input_node_id.into(), output_node_id.into())
502        .map(|edge| edge.id().index())
503        .next()
504        .expect("An edge connecting the input to the output should exist")
505}
506
507/// The connection id used here is the index of the config graph edge that equates to the wanted
508/// connection
509fn sort_inputs_by_cnx_id(
510    input_msg_indices_types: &mut [(u32, String)],
511    plan: &[CuExecutionUnit],
512    graph: &CuGraph,
513    curr_node_id: NodeId,
514) {
515    input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
516        let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
517        let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
518        a_edge_id.cmp(&b_edge_id)
519    });
520}
521/// Explores a subbranch and build the partial plan out of it.
522fn plan_tasks_tree_branch(
523    graph: &CuGraph,
524    mut next_culist_output_index: u32,
525    starting_point: NodeId,
526    plan: &mut Vec<CuExecutionUnit>,
527) -> (u32, bool) {
528    #[cfg(all(feature = "std", feature = "macro_debug"))]
529    eprintln!("-- starting branch from node {starting_point}");
530
531    let mut visitor = Bfs::new(&graph.0, starting_point.into());
532    let mut handled = false;
533
534    while let Some(node) = visitor.next(&graph.0) {
535        let id = node.index() as NodeId;
536        let node_ref = graph.get_node(id).unwrap();
537        #[cfg(all(feature = "std", feature = "macro_debug"))]
538        eprintln!("  Visiting node: {node_ref:?}");
539
540        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
541        let output_msg_index_type: Option<(u32, String)>;
542        let task_type = find_task_type_for_id(graph, id);
543
544        match task_type {
545            CuTaskType::Source => {
546                #[cfg(all(feature = "std", feature = "macro_debug"))]
547                eprintln!("    → Source node, assign output index {next_culist_output_index}");
548                output_msg_index_type = Some((
549                    next_culist_output_index,
550                    graph
551                        .0
552                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
553                        .unwrap() // FIXME(gbin): Error handling
554                        .msg
555                        .clone(),
556                ));
557                next_culist_output_index += 1;
558            }
559            CuTaskType::Sink => {
560                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
561                #[cfg(all(feature = "std", feature = "macro_debug"))]
562                eprintln!("    → Sink with parents: {parents:?}");
563                for parent in parents {
564                    let pid = parent;
565                    let index_type = find_output_index_type_from_nodeid(pid, plan);
566                    if let Some(index_type) = index_type {
567                        #[cfg(all(feature = "std", feature = "macro_debug"))]
568                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
569                        input_msg_indices_types.push(index_type);
570                    } else {
571                        #[cfg(all(feature = "std", feature = "macro_debug"))]
572                        eprintln!("      ✗ Input from {pid} not ready, returning");
573                        return (next_culist_output_index, handled);
574                    }
575                }
576                output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
577                next_culist_output_index += 1;
578            }
579            CuTaskType::Regular => {
580                let parents: Vec<NodeId> = graph.get_neighbor_ids(id, CuDirection::Incoming);
581                #[cfg(all(feature = "std", feature = "macro_debug"))]
582                eprintln!("    → Regular task with parents: {parents:?}");
583                for parent in parents {
584                    let pid = parent;
585                    let index_type = find_output_index_type_from_nodeid(pid, plan);
586                    if let Some(index_type) = index_type {
587                        #[cfg(all(feature = "std", feature = "macro_debug"))]
588                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
589                        input_msg_indices_types.push(index_type);
590                    } else {
591                        #[cfg(all(feature = "std", feature = "macro_debug"))]
592                        eprintln!("      ✗ Input from {pid} not ready, returning");
593                        return (next_culist_output_index, handled);
594                    }
595                }
596                output_msg_index_type = Some((
597                    next_culist_output_index,
598                    graph
599                        .0
600                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) // FIXME(gbin): Error handling and multimission
601                        .unwrap()
602                        .msg
603                        .clone(),
604                ));
605                next_culist_output_index += 1;
606            }
607        }
608
609        sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
610
611        if let Some(pos) = plan
612            .iter()
613            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
614        {
615            #[cfg(all(feature = "std", feature = "macro_debug"))]
616            eprintln!("    → Already in plan, modifying existing step");
617            let mut step = plan.remove(pos);
618            if let CuExecutionUnit::Step(ref mut s) = step {
619                s.input_msg_indices_types = input_msg_indices_types;
620            }
621            plan.push(step);
622        } else {
623            #[cfg(all(feature = "std", feature = "macro_debug"))]
624            eprintln!("    → New step added to plan");
625            let step = CuExecutionStep {
626                node_id: id,
627                node: node_ref.clone(),
628                task_type,
629                input_msg_indices_types,
630                output_msg_index_type,
631            };
632            plan.push(CuExecutionUnit::Step(step));
633        }
634
635        handled = true;
636    }
637
638    #[cfg(all(feature = "std", feature = "macro_debug"))]
639    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
640    (next_culist_output_index, handled)
641}
642
643/// This is the main heuristics to compute an execution plan at compilation time.
644/// TODO(gbin): Make that heuristic pluggable.
645pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
646    #[cfg(all(feature = "std", feature = "macro_debug"))]
647    eprintln!("[runtime plan]");
648    let visited = graph.0.visit_map();
649    let mut plan = Vec::new();
650    let mut next_culist_output_index = 0u32;
651
652    let mut queue: VecDeque<NodeId> = graph
653        .node_indices()
654        .iter()
655        .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
656        .map(|node| node.index() as NodeId)
657        .collect();
658
659    #[cfg(all(feature = "std", feature = "macro_debug"))]
660    eprintln!("Initial source nodes: {queue:?}");
661
662    while let Some(start_node) = queue.pop_front() {
663        if visited.is_visited(&start_node) {
664            #[cfg(all(feature = "std", feature = "macro_debug"))]
665            eprintln!("→ Skipping already visited source {start_node}");
666            continue;
667        }
668
669        #[cfg(all(feature = "std", feature = "macro_debug"))]
670        eprintln!("→ Starting BFS from source {start_node}");
671        let mut bfs = Bfs::new(&graph.0, start_node.into());
672
673        while let Some(node_index) = bfs.next(&graph.0) {
674            let node_id = node_index.index() as NodeId;
675            let already_in_plan = plan
676                .iter()
677                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
678            if already_in_plan {
679                #[cfg(all(feature = "std", feature = "macro_debug"))]
680                eprintln!("    → Node {node_id} already planned, skipping");
681                continue;
682            }
683
684            #[cfg(all(feature = "std", feature = "macro_debug"))]
685            eprintln!("    Planning from node {node_id}");
686            let (new_index, handled) =
687                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
688            next_culist_output_index = new_index;
689
690            if !handled {
691                #[cfg(all(feature = "std", feature = "macro_debug"))]
692                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
693                continue;
694            }
695
696            #[cfg(all(feature = "std", feature = "macro_debug"))]
697            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
698            for neighbor in graph.0.neighbors(node_index) {
699                if !visited.is_visited(&neighbor) {
700                    let nid = neighbor.index() as NodeId;
701                    #[cfg(all(feature = "std", feature = "macro_debug"))]
702                    eprintln!("      → Enqueueing neighbor {nid}");
703                    queue.push_back(nid);
704                }
705            }
706        }
707    }
708
709    Ok(CuExecutionLoop {
710        steps: plan,
711        loop_count: None,
712    })
713}
714
715//tests
716#[cfg(test)]
717mod tests {
718    use super::*;
719    use crate::config::Node;
720    use crate::cutask::CuSinkTask;
721    use crate::cutask::{CuSrcTask, Freezable};
722    use crate::monitoring::NoMonitor;
723    use bincode::Encode;
724    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
725    use serde_derive::Serialize;
726
727    pub struct TestSource {}
728
729    impl Freezable for TestSource {}
730
731    impl CuSrcTask for TestSource {
732        type Resources<'r> = ();
733        type Output<'m> = ();
734        fn new_with(
735            _config: Option<&ComponentConfig>,
736            _resources: Self::Resources<'_>,
737        ) -> CuResult<Self>
738        where
739            Self: Sized,
740        {
741            Ok(Self {})
742        }
743
744        fn process(
745            &mut self,
746            _clock: &RobotClock,
747            _empty_msg: &mut Self::Output<'_>,
748        ) -> CuResult<()> {
749            Ok(())
750        }
751    }
752
753    pub struct TestSink {}
754
755    impl Freezable for TestSink {}
756
757    impl CuSinkTask for TestSink {
758        type Resources<'r> = ();
759        type Input<'m> = ();
760
761        fn new_with(
762            _config: Option<&ComponentConfig>,
763            _resources: Self::Resources<'_>,
764        ) -> CuResult<Self>
765        where
766            Self: Sized,
767        {
768            Ok(Self {})
769        }
770
771        fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
772            Ok(())
773        }
774    }
775
776    // Those should be generated by the derive macro
777    type Tasks = (TestSource, TestSink);
778
779    #[derive(Debug, Encode, Decode, Serialize, Default)]
780    struct Msgs(());
781
782    impl ErasedCuStampedDataSet for Msgs {
783        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
784            Vec::new()
785        }
786    }
787
788    impl MatchingTasks for Msgs {
789        fn get_all_task_ids() -> &'static [&'static str] {
790            &[]
791        }
792    }
793
794    impl CuListZeroedInit for Msgs {
795        fn init_zeroed(&mut self) {}
796    }
797
798    #[cfg(feature = "std")]
799    fn tasks_instanciator(
800        all_instances_configs: Vec<Option<&ComponentConfig>>,
801        _resources: &mut ResourceManager,
802    ) -> CuResult<Tasks> {
803        Ok((
804            TestSource::new(all_instances_configs[0])?,
805            TestSink::new(all_instances_configs[1])?,
806        ))
807    }
808
809    #[cfg(not(feature = "std"))]
810    fn tasks_instanciator(
811        all_instances_configs: Vec<Option<&ComponentConfig>>,
812        _resources: &mut ResourceManager,
813    ) -> CuResult<Tasks> {
814        Ok((
815            TestSource::new(all_instances_configs[0])?,
816            TestSink::new(all_instances_configs[1])?,
817        ))
818    }
819
820    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
821        NoMonitor {}
822    }
823
824    fn bridges_instanciator(_config: &CuConfig, _resources: &mut ResourceManager) -> CuResult<()> {
825        Ok(())
826    }
827
828    fn resources_instanciator(_config: &CuConfig) -> CuResult<ResourceManager> {
829        Ok(ResourceManager::new(0))
830    }
831
832    #[derive(Debug)]
833    struct FakeWriter {}
834
835    impl<E: Encode> WriteStream<E> for FakeWriter {
836        fn log(&mut self, _obj: &E) -> CuResult<()> {
837            Ok(())
838        }
839    }
840
841    #[test]
842    fn test_runtime_instantiation() {
843        let mut config = CuConfig::default();
844        let graph = config.get_graph_mut(None).unwrap();
845        graph.add_node(Node::new("a", "TestSource")).unwrap();
846        graph.add_node(Node::new("b", "TestSink")).unwrap();
847        graph.connect(0, 1, "()").unwrap();
848        let runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
849            RobotClock::default(),
850            &config,
851            None,
852            resources_instanciator,
853            tasks_instanciator,
854            monitor_instanciator,
855            bridges_instanciator,
856            FakeWriter {},
857            FakeWriter {},
858        );
859        assert!(runtime.is_ok());
860    }
861
862    #[test]
863    fn test_copperlists_manager_lifecycle() {
864        let mut config = CuConfig::default();
865        let graph = config.get_graph_mut(None).unwrap();
866        graph.add_node(Node::new("a", "TestSource")).unwrap();
867        graph.add_node(Node::new("b", "TestSink")).unwrap();
868        graph.connect(0, 1, "()").unwrap();
869
870        let mut runtime = CuRuntime::<Tasks, (), Msgs, NoMonitor, 2>::new(
871            RobotClock::default(),
872            &config,
873            None,
874            resources_instanciator,
875            tasks_instanciator,
876            monitor_instanciator,
877            bridges_instanciator,
878            FakeWriter {},
879            FakeWriter {},
880        )
881        .unwrap();
882
883        // Now emulates the generated runtime
884        {
885            let copperlists = &mut runtime.copperlists_manager;
886            let culist0 = copperlists
887                .inner
888                .create()
889                .expect("Ran out of space for copper lists");
890            // FIXME: error handling.
891            let id = culist0.id;
892            assert_eq!(id, 0);
893            culist0.change_state(CopperListState::Processing);
894            assert_eq!(copperlists.available_copper_lists(), 1);
895        }
896
897        {
898            let copperlists = &mut runtime.copperlists_manager;
899            let culist1 = copperlists
900                .inner
901                .create()
902                .expect("Ran out of space for copper lists"); // FIXME: error handling.
903            let id = culist1.id;
904            assert_eq!(id, 1);
905            culist1.change_state(CopperListState::Processing);
906            assert_eq!(copperlists.available_copper_lists(), 0);
907        }
908
909        {
910            let copperlists = &mut runtime.copperlists_manager;
911            let culist2 = copperlists.inner.create();
912            assert!(culist2.is_none());
913            assert_eq!(copperlists.available_copper_lists(), 0);
914            // Free in order, should let the top of the stack be serialized and freed.
915            let _ = copperlists.end_of_processing(1);
916            assert_eq!(copperlists.available_copper_lists(), 1);
917        }
918
919        // Readd a CL
920        {
921            let copperlists = &mut runtime.copperlists_manager;
922            let culist2 = copperlists
923                .inner
924                .create()
925                .expect("Ran out of space for copper lists"); // FIXME: error handling.
926            let id = culist2.id;
927            assert_eq!(id, 2);
928            culist2.change_state(CopperListState::Processing);
929            assert_eq!(copperlists.available_copper_lists(), 0);
930            // Free out of order, the #0 first
931            let _ = copperlists.end_of_processing(0);
932            // Should not free up the top of the stack
933            assert_eq!(copperlists.available_copper_lists(), 0);
934
935            // Free up the top of the stack
936            let _ = copperlists.end_of_processing(2);
937            // This should free up 2 CLs
938
939            assert_eq!(copperlists.available_copper_lists(), 2);
940        }
941    }
942
943    #[test]
944    fn test_runtime_task_input_order() {
945        let mut config = CuConfig::default();
946        let graph = config.get_graph_mut(None).unwrap();
947        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
948        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
949        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
950
951        assert_eq!(src1_id, 0);
952        assert_eq!(src2_id, 1);
953
954        // note that the source2 connection is before the source1
955        let src1_type = "src1_type";
956        let src2_type = "src2_type";
957        graph.connect(src2_id, sink_id, src2_type).unwrap();
958        graph.connect(src1_id, sink_id, src1_type).unwrap();
959
960        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
961        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
962        // the edge id depends on the order the connection is created, not
963        // on the node id, and that is what determines the input order
964        assert_eq!(src1_edge_id, 1);
965        assert_eq!(src2_edge_id, 0);
966
967        let runtime = compute_runtime_plan(graph).unwrap();
968        let sink_step = runtime
969            .steps
970            .iter()
971            .find_map(|step| match step {
972                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
973                _ => None,
974            })
975            .unwrap();
976
977        // since the src2 connection was added before src1 connection, the src2 type should be
978        // first
979        assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
980        assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
981    }
982
983    #[test]
984    fn test_runtime_plan_diamond_case1() {
985        // more complex topology that tripped the scheduler
986        let mut config = CuConfig::default();
987        let graph = config.get_graph_mut(None).unwrap();
988        let cam0_id = graph
989            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
990            .unwrap();
991        let inf0_id = graph
992            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
993            .unwrap();
994        let broadcast_id = graph
995            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
996            .unwrap();
997
998        // case 1 order
999        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1000        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1001        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1002
1003        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1004        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
1005
1006        assert_eq!(edge_cam0_to_inf0, 0);
1007        assert_eq!(edge_cam0_to_broadcast, 1);
1008
1009        let runtime = compute_runtime_plan(graph).unwrap();
1010        let broadcast_step = runtime
1011            .steps
1012            .iter()
1013            .find_map(|step| match step {
1014                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1015                _ => None,
1016            })
1017            .unwrap();
1018
1019        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1020        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1021    }
1022
1023    #[test]
1024    fn test_runtime_plan_diamond_case2() {
1025        // more complex topology that tripped the scheduler variation 2
1026        let mut config = CuConfig::default();
1027        let graph = config.get_graph_mut(None).unwrap();
1028        let cam0_id = graph
1029            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1030            .unwrap();
1031        let inf0_id = graph
1032            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1033            .unwrap();
1034        let broadcast_id = graph
1035            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1036            .unwrap();
1037
1038        // case 2 order
1039        graph.connect(cam0_id, inf0_id, "i32").unwrap();
1040        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1041        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1042
1043        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1044        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1045
1046        assert_eq!(edge_cam0_to_broadcast, 0);
1047        assert_eq!(edge_cam0_to_inf0, 1);
1048
1049        let runtime = compute_runtime_plan(graph).unwrap();
1050        let broadcast_step = runtime
1051            .steps
1052            .iter()
1053            .find_map(|step| match step {
1054                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1055                _ => None,
1056            })
1057            .unwrap();
1058
1059        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1060        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1061    }
1062}