cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::CuMonitor;
10use cu29_clock::{ClockProvider, CuTime, RobotClock};
11use cu29_log_runtime::LoggerRuntime;
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15use cu29_unifiedlog::UnifiedLoggerWrite;
16use std::sync::{Arc, Mutex};
17
18use bincode::error::EncodeError;
19use bincode::{encode_into_std_write, Decode, Encode};
20use petgraph::prelude::*;
21use petgraph::visit::VisitMap;
22use petgraph::visit::Visitable;
23use rayon::ThreadPool;
24use std::fmt::Debug;
25
26/// Just a simple struct to hold the various bits needed to run a Copper application.
27pub struct CopperContext {
28    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
29    pub logger_runtime: LoggerRuntime,
30    pub clock: RobotClock,
31}
32
33/// Manages the lifecycle of the copper lists and logging.
34pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
35    pub inner: CuListsManager<P, NBCL>,
36    /// Logger for the copper lists (messages between tasks)
37    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
38}
39
40impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
41    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
42        let mut is_top = true;
43        let mut nb_done = 0;
44        for cl in self.inner.iter_mut() {
45            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
46                cl.change_state(CopperListState::DoneProcessing);
47            }
48            if is_top && cl.get_state() == CopperListState::DoneProcessing {
49                if let Some(logger) = &mut self.logger {
50                    cl.change_state(CopperListState::BeingSerialized);
51                    logger.log(cl)?;
52                }
53                cl.change_state(CopperListState::Free);
54                nb_done += 1;
55            } else {
56                is_top = false;
57            }
58        }
59        for _ in 0..nb_done {
60            let _ = self.inner.pop();
61        }
62        Ok(())
63    }
64
65    pub fn available_copper_lists(&self) -> usize {
66        NBCL - self.inner.len()
67    }
68}
69
70/// Manages the frozen tasks state and logging.
71pub struct KeyFramesManager {
72    /// Where the serialized tasks are stored following the wave of execution of a CL.
73    inner: KeyFrame,
74
75    /// Logger for the state of the tasks (frozen tasks)
76    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
77
78    /// Capture a keyframe only each...
79    keyframe_interval: u32,
80}
81
82impl KeyFramesManager {
83    fn is_keyframe(&self, culistid: u32) -> bool {
84        self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
85    }
86
87    pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
88        if self.is_keyframe(culistid) {
89            self.inner.reset(culistid, clock.now());
90        }
91    }
92
93    pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
94        if self.is_keyframe(culistid) {
95            if self.inner.culistid != culistid {
96                panic!("Freezing task for a different culistid");
97            }
98            self.inner
99                .add_frozen_task(task)
100                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
101        } else {
102            Ok(0)
103        }
104    }
105
106    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
107        if self.is_keyframe(culistid) {
108            let logger = self.logger.as_mut().unwrap();
109            logger.log(&self.inner)
110        } else {
111            Ok(())
112        }
113    }
114}
115
116/// This is the main structure that will be injected as a member of the Application struct.
117/// CT is the tuple of all the tasks in order of execution.
118/// CL is the type of the copper list, representing the input/output messages for all the tasks.
119pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
120    /// The base clock the runtime will be using to record time.
121    pub clock: RobotClock, // TODO: remove public at some point
122
123    /// The tuple of all the tasks in order of execution.
124    pub tasks: CT,
125
126    /// For backgrounded tasks.
127    pub threadpool: Arc<ThreadPool>,
128
129    /// The runtime monitoring.
130    pub monitor: M,
131
132    /// The logger for the copper lists (messages between tasks)
133    pub copperlists_manager: CopperListsManager<P, NBCL>,
134
135    /// The logger for the state of the tasks (frozen tasks)
136    pub keyframes_manager: KeyFramesManager,
137
138    /// The runtime configuration controlling the behavior of the run loop
139    pub runtime_config: RuntimeConfig,
140}
141
142/// To be able to share the clock we make the runtime a clock provider.
143impl<CT, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
144    ClockProvider for CuRuntime<CT, P, M, NBCL>
145{
146    fn get_clock(&self) -> RobotClock {
147        self.clock.clone()
148    }
149}
150
151/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
152/// It is a double encapsulation: this one recording the culistid and another even in
153/// bincode in the serialized_tasks.
154#[derive(Encode, Decode)]
155pub struct KeyFrame {
156    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
157    pub culistid: u32,
158    // This is the timestamp when the keyframe was created, using the robot clock.
159    pub timestamp: CuTime,
160    // This is the bincode representation of the tuple of all the tasks.
161    pub serialized_tasks: Vec<u8>,
162}
163
164impl KeyFrame {
165    fn new() -> Self {
166        KeyFrame {
167            culistid: 0,
168            timestamp: CuTime::default(),
169            serialized_tasks: Vec::new(),
170        }
171    }
172
173    /// This is to be able to avoid reallocations
174    fn reset(&mut self, culistid: u32, timestamp: CuTime) {
175        self.culistid = culistid;
176        self.timestamp = timestamp;
177        self.serialized_tasks.clear();
178    }
179
180    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
181    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
182        let config = bincode::config::standard();
183        encode_into_std_write(BincodeAdapter(task), &mut self.serialized_tasks, config)
184    }
185}
186
187impl<
188        CT,
189        P: CopperListTuple + CuListZeroedInit + Default + 'static,
190        M: CuMonitor,
191        const NBCL: usize,
192    > CuRuntime<CT, P, M, NBCL>
193{
194    pub fn new(
195        clock: RobotClock,
196        config: &CuConfig,
197        mission: Option<&str>,
198        tasks_instanciator: impl for<'c> Fn(
199            Vec<Option<&'c ComponentConfig>>,
200            Arc<ThreadPool>,
201        ) -> CuResult<CT>,
202        monitor_instanciator: impl Fn(&CuConfig) -> M,
203        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
204        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
205    ) -> CuResult<Self> {
206        let graph = config.get_graph(mission)?;
207        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
208            .get_all_nodes()
209            .iter()
210            .map(|(_, node)| node.get_instance_config())
211            .collect();
212
213        // TODO: make that configurable
214        let threadpool = Arc::new(
215            rayon::ThreadPoolBuilder::new()
216                .num_threads(2) // default to 4 threads if not specified
217                .build()
218                .expect("Could not create the threadpool"),
219        );
220
221        let tasks = tasks_instanciator(all_instances_configs, threadpool.clone())?;
222
223        let monitor = monitor_instanciator(config);
224
225        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
226            Some(logging_config) if logging_config.enable_task_logging => (
227                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
228                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
229                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
230            ),
231            Some(_) => (None, None, 0), // explicit no enable logging
232            None => (
233                // default
234                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
235                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
236                DEFAULT_KEYFRAME_INTERVAL,
237            ),
238        };
239
240        let copperlists_manager = CopperListsManager {
241            inner: CuListsManager::new(),
242            logger: copperlists_logger,
243        };
244
245        let keyframes_manager = KeyFramesManager {
246            inner: KeyFrame::new(),
247            logger: keyframes_logger,
248            keyframe_interval,
249        };
250
251        let runtime_config = config.runtime.clone().unwrap_or_default();
252
253        let runtime = Self {
254            tasks,
255            threadpool,
256            monitor,
257            clock,
258            copperlists_manager,
259            keyframes_manager,
260            runtime_config,
261        };
262
263        Ok(runtime)
264    }
265}
266
267/// Copper tasks can be of 3 types:
268/// - Source: only producing output messages (usually used for drivers)
269/// - Regular: processing input messages and producing output messages, more like compute nodes.
270/// - Sink: only consuming input messages (usually used for actuators)
271#[derive(Debug, PartialEq, Eq, Clone, Copy)]
272pub enum CuTaskType {
273    Source,
274    Regular,
275    Sink,
276}
277
278/// This structure represents a step in the execution plan.
279pub struct CuExecutionStep {
280    /// NodeId: node id of the task to execute
281    pub node_id: NodeId,
282    /// Node: node instance
283    pub node: Node,
284    /// CuTaskType: type of the task
285    pub task_type: CuTaskType,
286
287    /// the indices in the copper list of the input messages and their types
288    pub input_msg_indices_types: Vec<(u32, String)>,
289
290    /// the index in the copper list of the output message and its type
291    pub output_msg_index_type: Option<(u32, String)>,
292}
293
294impl Debug for CuExecutionStep {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
297        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
298        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
299        f.write_str(
300            format!(
301                "              input_msg_types: {:?}\n",
302                self.input_msg_indices_types
303            )
304            .as_str(),
305        )?;
306        f.write_str(
307            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
308        )?;
309        Ok(())
310    }
311}
312
313/// This structure represents a loop in the execution plan.
314/// It is used to represent a sequence of Execution units (loop or steps) that are executed
315/// multiple times.
316/// if loop_count is None, the loop is infinite.
317pub struct CuExecutionLoop {
318    pub steps: Vec<CuExecutionUnit>,
319    pub loop_count: Option<u32>,
320}
321
322impl Debug for CuExecutionLoop {
323    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324        f.write_str("CuExecutionLoop:\n")?;
325        for step in &self.steps {
326            match step {
327                CuExecutionUnit::Step(step) => {
328                    step.fmt(f)?;
329                }
330                CuExecutionUnit::Loop(l) => {
331                    l.fmt(f)?;
332                }
333            }
334        }
335
336        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
337        Ok(())
338    }
339}
340
341/// This structure represents a step in the execution plan.
342#[derive(Debug)]
343pub enum CuExecutionUnit {
344    Step(CuExecutionStep),
345    Loop(CuExecutionLoop),
346}
347
348fn find_output_index_type_from_nodeid(
349    node_id: NodeId,
350    steps: &Vec<CuExecutionUnit>,
351) -> Option<(u32, String)> {
352    for step in steps {
353        match step {
354            CuExecutionUnit::Loop(loop_unit) => {
355                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
356                    return Some(index);
357                }
358            }
359            CuExecutionUnit::Step(step) => {
360                if step.node_id == node_id {
361                    return step.output_msg_index_type.clone();
362                }
363            }
364        }
365    }
366    None
367}
368
369pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
370    if graph.0.neighbors_directed(node_id.into(), Incoming).count() == 0 {
371        CuTaskType::Source
372    } else if graph.0.neighbors_directed(node_id.into(), Outgoing).count() == 0 {
373        CuTaskType::Sink
374    } else {
375        CuTaskType::Regular
376    }
377}
378
379/// This function gets the input node by using the input step plan id, to get the edge that
380/// connects the input to the output in the config graph
381fn find_edge_with_plan_input_id(
382    plan: &[CuExecutionUnit],
383    graph: &CuGraph,
384    plan_id: u32,
385    output_node_id: NodeId,
386) -> usize {
387    let input_node = plan
388        .get(plan_id as usize)
389        .expect("Input step should've been added to plan before the step that receives the input");
390    let CuExecutionUnit::Step(input_step) = input_node else {
391        panic!("Expected input to be from a step, not a loop");
392    };
393    let input_node_id = input_step.node_id;
394
395    graph
396        .0
397        .edges_connecting(input_node_id.into(), output_node_id.into())
398        .map(|edge| edge.id().index())
399        .next()
400        .expect("An edge connecting the input to the output should exist")
401}
402
403/// The connection id used here is the index of the config graph edge that equates to the wanted
404/// connection
405fn sort_inputs_by_cnx_id(
406    input_msg_indices_types: &mut [(u32, String)],
407    plan: &[CuExecutionUnit],
408    graph: &CuGraph,
409    curr_node_id: NodeId,
410) {
411    input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
412        let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
413        let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
414        a_edge_id.cmp(&b_edge_id)
415    });
416}
417/// Explores a subbranch and build the partial plan out of it.
418fn plan_tasks_tree_branch(
419    graph: &CuGraph,
420    mut next_culist_output_index: u32,
421    starting_point: NodeId,
422    plan: &mut Vec<CuExecutionUnit>,
423) -> (u32, bool) {
424    #[cfg(feature = "macro_debug")]
425    eprintln!("-- starting branch from node {starting_point}");
426
427    let mut visitor = Bfs::new(&graph.0, starting_point.into());
428    let mut handled = false;
429
430    while let Some(node) = visitor.next(&graph.0) {
431        let id = node.index() as NodeId;
432        let node_ref = graph.get_node(id).unwrap();
433        #[cfg(feature = "macro_debug")]
434        eprintln!("  Visiting node: {node_ref:?}");
435
436        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
437        let output_msg_index_type: Option<(u32, String)>;
438        let task_type = find_task_type_for_id(graph, id);
439
440        match task_type {
441            CuTaskType::Source => {
442                #[cfg(feature = "macro_debug")]
443                eprintln!("    → Source node, assign output index {next_culist_output_index}");
444                output_msg_index_type = Some((
445                    next_culist_output_index,
446                    graph
447                        .0
448                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
449                        .unwrap() // FIXME(gbin): Error handling
450                        .msg
451                        .clone(),
452                ));
453                next_culist_output_index += 1;
454            }
455            CuTaskType::Sink => {
456                let parents: Vec<NodeIndex> =
457                    graph.0.neighbors_directed(id.into(), Incoming).collect();
458                #[cfg(feature = "macro_debug")]
459                eprintln!("    → Sink with parents: {parents:?}");
460                for parent in &parents {
461                    let pid = parent.index() as NodeId;
462                    let index_type = find_output_index_type_from_nodeid(pid, plan);
463                    if let Some(index_type) = index_type {
464                        #[cfg(feature = "macro_debug")]
465                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
466                        input_msg_indices_types.push(index_type);
467                    } else {
468                        #[cfg(feature = "macro_debug")]
469                        eprintln!("      ✗ Input from {pid} not ready, returning");
470                        return (next_culist_output_index, handled);
471                    }
472                }
473                output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
474                next_culist_output_index += 1;
475            }
476            CuTaskType::Regular => {
477                let parents: Vec<NodeIndex> =
478                    graph.0.neighbors_directed(id.into(), Incoming).collect();
479                #[cfg(feature = "macro_debug")]
480                eprintln!("    → Regular task with parents: {parents:?}");
481                for parent in &parents {
482                    let pid = parent.index() as NodeId;
483                    let index_type = find_output_index_type_from_nodeid(pid, plan);
484                    if let Some(index_type) = index_type {
485                        #[cfg(feature = "macro_debug")]
486                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
487                        input_msg_indices_types.push(index_type);
488                    } else {
489                        #[cfg(feature = "macro_debug")]
490                        eprintln!("      ✗ Input from {pid} not ready, returning");
491                        return (next_culist_output_index, handled);
492                    }
493                }
494                output_msg_index_type = Some((
495                    next_culist_output_index,
496                    graph
497                        .0
498                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) // FIXME(gbin): Error handling and multimission
499                        .unwrap()
500                        .msg
501                        .clone(),
502                ));
503                next_culist_output_index += 1;
504            }
505        }
506
507        sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
508
509        if let Some(pos) = plan
510            .iter()
511            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
512        {
513            #[cfg(feature = "macro_debug")]
514            eprintln!("    → Already in plan, modifying existing step");
515            let mut step = plan.remove(pos);
516            if let CuExecutionUnit::Step(ref mut s) = step {
517                s.input_msg_indices_types = input_msg_indices_types;
518            }
519            plan.push(step);
520        } else {
521            #[cfg(feature = "macro_debug")]
522            eprintln!("    → New step added to plan");
523            let step = CuExecutionStep {
524                node_id: id,
525                node: node_ref.clone(),
526                task_type,
527                input_msg_indices_types,
528                output_msg_index_type,
529            };
530            plan.push(CuExecutionUnit::Step(step));
531        }
532
533        handled = true;
534    }
535
536    #[cfg(feature = "macro_debug")]
537    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
538    (next_culist_output_index, handled)
539}
540
541/// This is the main heuristics to compute an execution plan at compilation time.
542/// TODO(gbin): Make that heuristic pluggable.
543pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
544    #[cfg(feature = "macro_debug")]
545    eprintln!("[runtime plan]");
546    let visited = graph.0.visit_map();
547    let mut plan = Vec::new();
548    let mut next_culist_output_index = 0u32;
549
550    let mut queue: std::collections::VecDeque<NodeId> = graph
551        .node_indices()
552        .iter()
553        .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
554        .map(|node| node.index() as NodeId)
555        .collect();
556
557    #[cfg(feature = "macro_debug")]
558    eprintln!("Initial source nodes: {queue:?}");
559
560    while let Some(start_node) = queue.pop_front() {
561        if visited.is_visited(&start_node) {
562            #[cfg(feature = "macro_debug")]
563            eprintln!("→ Skipping already visited source {start_node}");
564            continue;
565        }
566
567        #[cfg(feature = "macro_debug")]
568        eprintln!("→ Starting BFS from source {start_node}");
569        let mut bfs = Bfs::new(&graph.0, start_node.into());
570
571        while let Some(node_index) = bfs.next(&graph.0) {
572            let node_id = node_index.index() as NodeId;
573            let already_in_plan = plan
574                .iter()
575                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
576            if already_in_plan {
577                #[cfg(feature = "macro_debug")]
578                eprintln!("    → Node {node_id} already planned, skipping");
579                continue;
580            }
581
582            #[cfg(feature = "macro_debug")]
583            eprintln!("    Planning from node {node_id}");
584            let (new_index, handled) =
585                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
586            next_culist_output_index = new_index;
587
588            if !handled {
589                #[cfg(feature = "macro_debug")]
590                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
591                continue;
592            }
593
594            #[cfg(feature = "macro_debug")]
595            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
596            for neighbor in graph.0.neighbors(node_index) {
597                if !visited.is_visited(&neighbor) {
598                    let nid = neighbor.index() as NodeId;
599                    #[cfg(feature = "macro_debug")]
600                    eprintln!("      → Enqueueing neighbor {nid}");
601                    queue.push_back(nid);
602                }
603            }
604        }
605    }
606
607    Ok(CuExecutionLoop {
608        steps: plan,
609        loop_count: None,
610    })
611}
612
613//tests
614#[cfg(test)]
615mod tests {
616    use super::*;
617    use crate::config::Node;
618    use crate::cutask::CuSinkTask;
619    use crate::cutask::{CuSrcTask, Freezable};
620    use crate::monitoring::NoMonitor;
621    use bincode::Encode;
622    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
623    use serde_derive::Serialize;
624
625    pub struct TestSource {}
626
627    impl Freezable for TestSource {}
628
629    impl CuSrcTask for TestSource {
630        type Output<'m> = ();
631        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
632        where
633            Self: Sized,
634        {
635            Ok(Self {})
636        }
637
638        fn process(
639            &mut self,
640            _clock: &RobotClock,
641            _empty_msg: &mut Self::Output<'_>,
642        ) -> CuResult<()> {
643            Ok(())
644        }
645    }
646
647    pub struct TestSink {}
648
649    impl Freezable for TestSink {}
650
651    impl CuSinkTask for TestSink {
652        type Input<'m> = ();
653
654        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
655        where
656            Self: Sized,
657        {
658            Ok(Self {})
659        }
660
661        fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
662            Ok(())
663        }
664    }
665
666    // Those should be generated by the derive macro
667    type Tasks = (TestSource, TestSink);
668
669    #[derive(Debug, Encode, Decode, Serialize, Default)]
670    struct Msgs(());
671
672    impl ErasedCuStampedDataSet for Msgs {
673        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
674            Vec::new()
675        }
676    }
677
678    impl MatchingTasks for Msgs {
679        fn get_all_task_ids() -> &'static [&'static str] {
680            &[]
681        }
682    }
683
684    impl CuListZeroedInit for Msgs {
685        fn init_zeroed(&mut self) {}
686    }
687
688    fn tasks_instanciator(
689        all_instances_configs: Vec<Option<&ComponentConfig>>,
690        _threadpool: Arc<ThreadPool>,
691    ) -> CuResult<Tasks> {
692        Ok((
693            TestSource::new(all_instances_configs[0])?,
694            TestSink::new(all_instances_configs[1])?,
695        ))
696    }
697
698    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
699        NoMonitor {}
700    }
701
702    #[derive(Debug)]
703    struct FakeWriter {}
704
705    impl<E: Encode> WriteStream<E> for FakeWriter {
706        fn log(&mut self, _obj: &E) -> CuResult<()> {
707            Ok(())
708        }
709    }
710
711    #[test]
712    fn test_runtime_instantiation() {
713        let mut config = CuConfig::default();
714        let graph = config.get_graph_mut(None).unwrap();
715        graph.add_node(Node::new("a", "TestSource")).unwrap();
716        graph.add_node(Node::new("b", "TestSink")).unwrap();
717        graph.connect(0, 1, "()").unwrap();
718        let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
719            RobotClock::default(),
720            &config,
721            None,
722            tasks_instanciator,
723            monitor_instanciator,
724            FakeWriter {},
725            FakeWriter {},
726        );
727        assert!(runtime.is_ok());
728    }
729
730    #[test]
731    fn test_copperlists_manager_lifecycle() {
732        let mut config = CuConfig::default();
733        let graph = config.get_graph_mut(None).unwrap();
734        graph.add_node(Node::new("a", "TestSource")).unwrap();
735        graph.add_node(Node::new("b", "TestSink")).unwrap();
736        graph.connect(0, 1, "()").unwrap();
737
738        let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
739            RobotClock::default(),
740            &config,
741            None,
742            tasks_instanciator,
743            monitor_instanciator,
744            FakeWriter {},
745            FakeWriter {},
746        )
747        .unwrap();
748
749        // Now emulates the generated runtime
750        {
751            let copperlists = &mut runtime.copperlists_manager;
752            let culist0 = copperlists
753                .inner
754                .create()
755                .expect("Ran out of space for copper lists");
756            // FIXME: error handling.
757            let id = culist0.id;
758            assert_eq!(id, 0);
759            culist0.change_state(CopperListState::Processing);
760            assert_eq!(copperlists.available_copper_lists(), 1);
761        }
762
763        {
764            let copperlists = &mut runtime.copperlists_manager;
765            let culist1 = copperlists
766                .inner
767                .create()
768                .expect("Ran out of space for copper lists"); // FIXME: error handling.
769            let id = culist1.id;
770            assert_eq!(id, 1);
771            culist1.change_state(CopperListState::Processing);
772            assert_eq!(copperlists.available_copper_lists(), 0);
773        }
774
775        {
776            let copperlists = &mut runtime.copperlists_manager;
777            let culist2 = copperlists.inner.create();
778            assert!(culist2.is_none());
779            assert_eq!(copperlists.available_copper_lists(), 0);
780            // Free in order, should let the top of the stack be serialized and freed.
781            let _ = copperlists.end_of_processing(1);
782            assert_eq!(copperlists.available_copper_lists(), 1);
783        }
784
785        // Readd a CL
786        {
787            let copperlists = &mut runtime.copperlists_manager;
788            let culist2 = copperlists
789                .inner
790                .create()
791                .expect("Ran out of space for copper lists"); // FIXME: error handling.
792            let id = culist2.id;
793            assert_eq!(id, 2);
794            culist2.change_state(CopperListState::Processing);
795            assert_eq!(copperlists.available_copper_lists(), 0);
796            // Free out of order, the #0 first
797            let _ = copperlists.end_of_processing(0);
798            // Should not free up the top of the stack
799            assert_eq!(copperlists.available_copper_lists(), 0);
800
801            // Free up the top of the stack
802            let _ = copperlists.end_of_processing(2);
803            // This should free up 2 CLs
804
805            assert_eq!(copperlists.available_copper_lists(), 2);
806        }
807    }
808
809    #[test]
810    fn test_runtime_task_input_order() {
811        let mut config = CuConfig::default();
812        let graph = config.get_graph_mut(None).unwrap();
813        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
814        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
815        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
816
817        assert_eq!(src1_id, 0);
818        assert_eq!(src2_id, 1);
819
820        // note that the source2 connection is before the source1
821        let src1_type = "src1_type";
822        let src2_type = "src2_type";
823        graph.connect(src2_id, sink_id, src2_type).unwrap();
824        graph.connect(src1_id, sink_id, src1_type).unwrap();
825
826        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
827        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
828        // the edge id depends on the order the connection is created, not
829        // on the node id, and that is what determines the input order
830        assert_eq!(src1_edge_id, 1);
831        assert_eq!(src2_edge_id, 0);
832
833        let runtime = compute_runtime_plan(graph).unwrap();
834        let sink_step = runtime
835            .steps
836            .iter()
837            .find_map(|step| match step {
838                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
839                _ => None,
840            })
841            .unwrap();
842
843        // since the src2 connection was added before src1 connection, the src2 type should be
844        // first
845        assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
846        assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
847    }
848
849    #[test]
850    fn test_runtime_plan_diamond_case1() {
851        // more complex topology that tripped the scheduler
852        let mut config = CuConfig::default();
853        let graph = config.get_graph_mut(None).unwrap();
854        let cam0_id = graph
855            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
856            .unwrap();
857        let inf0_id = graph
858            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
859            .unwrap();
860        let broadcast_id = graph
861            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
862            .unwrap();
863
864        // case 1 order
865        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
866        graph.connect(cam0_id, inf0_id, "i32").unwrap();
867        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
868
869        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
870        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
871
872        assert_eq!(edge_cam0_to_inf0, 0);
873        assert_eq!(edge_cam0_to_broadcast, 1);
874
875        let runtime = compute_runtime_plan(graph).unwrap();
876        let broadcast_step = runtime
877            .steps
878            .iter()
879            .find_map(|step| match step {
880                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
881                _ => None,
882            })
883            .unwrap();
884
885        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
886        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
887    }
888
889    #[test]
890    fn test_runtime_plan_diamond_case2() {
891        // more complex topology that tripped the scheduler variation 2
892        let mut config = CuConfig::default();
893        let graph = config.get_graph_mut(None).unwrap();
894        let cam0_id = graph
895            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
896            .unwrap();
897        let inf0_id = graph
898            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
899            .unwrap();
900        let broadcast_id = graph
901            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
902            .unwrap();
903
904        // case 2 order
905        graph.connect(cam0_id, inf0_id, "i32").unwrap();
906        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
907        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
908
909        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
910        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
911
912        assert_eq!(edge_cam0_to_broadcast, 0);
913        assert_eq!(edge_cam0_to_inf0, 1);
914
915        let runtime = compute_runtime_plan(graph).unwrap();
916        let broadcast_step = runtime
917            .steps
918            .iter()
919            .find_map(|step| match step {
920                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
921                _ => None,
922            })
923            .unwrap();
924
925        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
926        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
927    }
928}