cu29_runtime/
curuntime.rs

1//! CuRuntime is the heart of what copper is running on the robot.
2//! It is exposed to the user via the `copper_runtime` macro injecting it as a field in their application struct.
3//!
4
5use crate::config::{ComponentConfig, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId};
7use crate::copperlist::{CopperList, CopperListState, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::CuMonitor;
10use cu29_clock::{ClockProvider, CuTime, RobotClock};
11use cu29_log_runtime::LoggerRuntime;
12use cu29_traits::CuResult;
13use cu29_traits::WriteStream;
14use cu29_traits::{CopperListTuple, CuError};
15use cu29_unifiedlog::UnifiedLoggerWrite;
16use std::sync::{Arc, Mutex};
17
18use bincode::error::EncodeError;
19use bincode::{encode_into_std_write, Decode, Encode};
20use petgraph::prelude::*;
21use petgraph::visit::VisitMap;
22use petgraph::visit::Visitable;
23use std::fmt::Debug;
24
25/// Just a simple struct to hold the various bits needed to run a Copper application.
26pub struct CopperContext {
27    pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
28    pub logger_runtime: LoggerRuntime,
29    pub clock: RobotClock,
30}
31
32/// Manages the lifecycle of the copper lists and logging.
33pub struct CopperListsManager<P: CopperListTuple, const NBCL: usize> {
34    pub inner: CuListsManager<P, NBCL>,
35    /// Logger for the copper lists (messages between tasks)
36    pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
37}
38
39impl<P: CopperListTuple, const NBCL: usize> CopperListsManager<P, NBCL> {
40    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
41        let mut is_top = true;
42        let mut nb_done = 0;
43        for cl in self.inner.iter_mut() {
44            if cl.id == culistid && cl.get_state() == CopperListState::Processing {
45                cl.change_state(CopperListState::DoneProcessing);
46            }
47            if is_top && cl.get_state() == CopperListState::DoneProcessing {
48                if let Some(logger) = &mut self.logger {
49                    cl.change_state(CopperListState::BeingSerialized);
50                    logger.log(cl)?;
51                }
52                cl.change_state(CopperListState::Free);
53                nb_done += 1;
54            } else {
55                is_top = false;
56            }
57        }
58        for _ in 0..nb_done {
59            let _ = self.inner.pop();
60        }
61        Ok(())
62    }
63
64    pub fn available_copper_lists(&self) -> usize {
65        NBCL - self.inner.len()
66    }
67}
68
69/// Manages the frozen tasks state and logging.
70pub struct KeyFramesManager {
71    /// Where the serialized tasks are stored following the wave of execution of a CL.
72    inner: KeyFrame,
73
74    /// Logger for the state of the tasks (frozen tasks)
75    logger: Option<Box<dyn WriteStream<KeyFrame>>>,
76
77    /// Capture a keyframe only each...
78    keyframe_interval: u32,
79}
80
81impl KeyFramesManager {
82    fn is_keyframe(&self, culistid: u32) -> bool {
83        self.logger.is_some() && culistid % self.keyframe_interval == 0
84    }
85
86    pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
87        if self.is_keyframe(culistid) {
88            self.inner.reset(culistid, clock.now());
89        }
90    }
91
92    pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
93        if self.is_keyframe(culistid) {
94            if self.inner.culistid != culistid {
95                panic!("Freezing task for a different culistid");
96            }
97            self.inner
98                .add_frozen_task(task)
99                .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
100        } else {
101            Ok(0)
102        }
103    }
104
105    pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
106        if self.is_keyframe(culistid) {
107            let logger = self.logger.as_mut().unwrap();
108            logger.log(&self.inner)
109        } else {
110            Ok(())
111        }
112    }
113}
114
115/// This is the main structure that will be injected as a member of the Application struct.
116/// CT is the tuple of all the tasks in order of execution.
117/// CL is the type of the copper list, representing the input/output messages for all the tasks.
118pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
119    /// The base clock the runtime will be using to record time.
120    pub clock: RobotClock, // TODO: remove public at some point
121
122    /// The tuple of all the tasks in order of execution.
123    pub tasks: CT,
124
125    /// The runtime monitoring.
126    pub monitor: M,
127
128    /// The logger for the copper lists (messages between tasks)
129    pub copperlists_manager: CopperListsManager<P, NBCL>,
130
131    /// The logger for the state of the tasks (frozen tasks)
132    pub keyframes_manager: KeyFramesManager,
133}
134
135/// To be able to share the clock we make the runtime a clock provider.
136impl<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> ClockProvider
137    for CuRuntime<CT, P, M, NBCL>
138{
139    fn get_clock(&self) -> RobotClock {
140        self.clock.clone()
141    }
142}
143
144/// A KeyFrame is recording a snapshot of the tasks state before a given copperlist.
145/// It is a double encapsulation: this one recording the culistid and another even in
146/// bincode in the serialized_tasks.
147#[derive(Encode, Decode)]
148pub struct KeyFrame {
149    // This is the id of the copper list that this keyframe is associated with (recorded before the copperlist).
150    pub culistid: u32,
151    // This is the timestamp when the keyframe was created, using the robot clock.
152    pub timestamp: CuTime,
153    // This is the bincode representation of the tuple of all the tasks.
154    pub serialized_tasks: Vec<u8>,
155}
156
157impl KeyFrame {
158    fn new() -> Self {
159        KeyFrame {
160            culistid: 0,
161            timestamp: CuTime::default(),
162            serialized_tasks: Vec::new(),
163        }
164    }
165
166    /// This is to be able to avoid reallocations
167    fn reset(&mut self, culistid: u32, timestamp: CuTime) {
168        self.culistid = culistid;
169        self.timestamp = timestamp;
170        self.serialized_tasks.clear();
171    }
172
173    /// We need to be able to accumulate tasks to the serialization as they are executed after the step.
174    fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
175        let config = bincode::config::standard();
176        encode_into_std_write(BincodeAdapter(task), &mut self.serialized_tasks, config)
177    }
178}
179
180impl<CT, P: CopperListTuple + 'static, M: CuMonitor, const NBCL: usize> CuRuntime<CT, P, M, NBCL> {
181    pub fn new(
182        clock: RobotClock,
183        config: &CuConfig,
184        mission: Option<&str>,
185        tasks_instanciator: impl Fn(Vec<Option<&ComponentConfig>>) -> CuResult<CT>,
186        monitor_instanciator: impl Fn(&CuConfig) -> M,
187        copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
188        keyframes_logger: impl WriteStream<KeyFrame> + 'static,
189    ) -> CuResult<Self> {
190        let graph = config.get_graph(mission)?;
191        let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
192            .get_all_nodes()
193            .iter()
194            .map(|(_, node)| node.get_instance_config())
195            .collect();
196        let tasks = tasks_instanciator(all_instances_configs)?;
197
198        let monitor = monitor_instanciator(config);
199
200        let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
201            Some(logging_config) if logging_config.enable_task_logging => (
202                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
203                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
204                logging_config.keyframe_interval.unwrap(), // it is set to a default at parsing time
205            ),
206            Some(_) => (None, None, 0), // explicit no enable logging
207            None => (
208                // default
209                Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
210                Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
211                DEFAULT_KEYFRAME_INTERVAL,
212            ),
213        };
214
215        let copperlists_manager = CopperListsManager {
216            inner: CuListsManager::new(),
217            logger: copperlists_logger,
218        };
219
220        let keyframes_manager = KeyFramesManager {
221            inner: KeyFrame::new(),
222            logger: keyframes_logger,
223            keyframe_interval,
224        };
225
226        let runtime = Self {
227            tasks,
228            monitor,
229            clock,
230            copperlists_manager,
231            keyframes_manager,
232        };
233
234        Ok(runtime)
235    }
236}
237
238/// Copper tasks can be of 3 types:
239/// - Source: only producing output messages (usually used for drivers)
240/// - Regular: processing input messages and producing output messages, more like compute nodes.
241/// - Sink: only consuming input messages (usually used for actuators)
242#[derive(Debug, PartialEq, Eq, Clone, Copy)]
243pub enum CuTaskType {
244    Source,
245    Regular,
246    Sink,
247}
248
249/// This structure represents a step in the execution plan.
250pub struct CuExecutionStep {
251    /// NodeId: node id of the task to execute
252    pub node_id: NodeId,
253    /// Node: node instance
254    pub node: Node,
255    /// CuTaskType: type of the task
256    pub task_type: CuTaskType,
257
258    /// the indices in the copper list of the input messages and their types
259    pub input_msg_indices_types: Vec<(u32, String)>,
260
261    /// the index in the copper list of the output message and its type
262    pub output_msg_index_type: Option<(u32, String)>,
263}
264
265impl Debug for CuExecutionStep {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        f.write_str(format!("   CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
268        f.write_str(format!("                  task_type: {:?}\n", self.node.get_type()).as_str())?;
269        f.write_str(format!("                       task: {:?}\n", self.task_type).as_str())?;
270        f.write_str(
271            format!(
272                "              input_msg_types: {:?}\n",
273                self.input_msg_indices_types
274            )
275            .as_str(),
276        )?;
277        f.write_str(
278            format!("       output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
279        )?;
280        Ok(())
281    }
282}
283
284/// This structure represents a loop in the execution plan.
285/// It is used to represent a sequence of Execution units (loop or steps) that are executed
286/// multiple times.
287/// if loop_count is None, the loop is infinite.
288pub struct CuExecutionLoop {
289    pub steps: Vec<CuExecutionUnit>,
290    pub loop_count: Option<u32>,
291}
292
293impl Debug for CuExecutionLoop {
294    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295        f.write_str("CuExecutionLoop:\n")?;
296        for step in &self.steps {
297            match step {
298                CuExecutionUnit::Step(step) => {
299                    step.fmt(f)?;
300                }
301                CuExecutionUnit::Loop(l) => {
302                    l.fmt(f)?;
303                }
304            }
305        }
306
307        f.write_str(format!("   count: {:?}", self.loop_count).as_str())?;
308        Ok(())
309    }
310}
311
312/// This structure represents a step in the execution plan.
313#[derive(Debug)]
314pub enum CuExecutionUnit {
315    Step(CuExecutionStep),
316    Loop(CuExecutionLoop),
317}
318
319fn find_output_index_type_from_nodeid(
320    node_id: NodeId,
321    steps: &Vec<CuExecutionUnit>,
322) -> Option<(u32, String)> {
323    for step in steps {
324        match step {
325            CuExecutionUnit::Loop(loop_unit) => {
326                if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
327                    return Some(index);
328                }
329            }
330            CuExecutionUnit::Step(step) => {
331                if step.node_id == node_id {
332                    return step.output_msg_index_type.clone();
333                }
334            }
335        }
336    }
337    None
338}
339
340pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
341    if graph.0.neighbors_directed(node_id.into(), Incoming).count() == 0 {
342        CuTaskType::Source
343    } else if graph.0.neighbors_directed(node_id.into(), Outgoing).count() == 0 {
344        CuTaskType::Sink
345    } else {
346        CuTaskType::Regular
347    }
348}
349
350/// This function gets the input node by using the input step plan id, to get the edge that
351/// connects the input to the output in the config graph
352fn find_edge_with_plan_input_id(
353    plan: &[CuExecutionUnit],
354    graph: &CuGraph,
355    plan_id: u32,
356    output_node_id: NodeId,
357) -> usize {
358    let input_node = plan
359        .get(plan_id as usize)
360        .expect("Input step should've been added to plan before the step that receives the input");
361    let CuExecutionUnit::Step(input_step) = input_node else {
362        panic!("Expected input to be from a step, not a loop");
363    };
364    let input_node_id = input_step.node_id;
365
366    graph
367        .0
368        .edges_connecting(input_node_id.into(), output_node_id.into())
369        .map(|edge| edge.id().index())
370        .next()
371        .expect("An edge connecting the input to the output should exist")
372}
373
374/// The connection id used here is the index of the config graph edge that equates to the wanted
375/// connection
376fn sort_inputs_by_cnx_id(
377    input_msg_indices_types: &mut [(u32, String)],
378    plan: &[CuExecutionUnit],
379    graph: &CuGraph,
380    curr_node_id: NodeId,
381) {
382    input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
383        let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
384        let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
385        a_edge_id.cmp(&b_edge_id)
386    });
387}
388/// Explores a subbranch and build the partial plan out of it.
389fn plan_tasks_tree_branch(
390    graph: &CuGraph,
391    mut next_culist_output_index: u32,
392    starting_point: NodeId,
393    plan: &mut Vec<CuExecutionUnit>,
394) -> (u32, bool) {
395    #[cfg(feature = "macro_debug")]
396    eprintln!("-- starting branch from node {starting_point}");
397
398    let mut visitor = Bfs::new(&graph.0, starting_point.into());
399    let mut handled = false;
400
401    while let Some(node) = visitor.next(&graph.0) {
402        let id = node.index() as NodeId;
403        let node_ref = graph.get_node(id).unwrap();
404        #[cfg(feature = "macro_debug")]
405        eprintln!("  Visiting node: {node_ref:?}");
406
407        let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
408        let output_msg_index_type: Option<(u32, String)>;
409        let task_type = find_task_type_for_id(graph, id);
410
411        match task_type {
412            CuTaskType::Source => {
413                #[cfg(feature = "macro_debug")]
414                eprintln!("    → Source node, assign output index {next_culist_output_index}");
415                output_msg_index_type = Some((
416                    next_culist_output_index,
417                    graph
418                        .0
419                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
420                        .unwrap() // FIXME(gbin): Error handling
421                        .msg
422                        .clone(),
423                ));
424                next_culist_output_index += 1;
425            }
426            CuTaskType::Sink => {
427                let parents: Vec<NodeIndex> =
428                    graph.0.neighbors_directed(id.into(), Incoming).collect();
429                #[cfg(feature = "macro_debug")]
430                eprintln!("    → Sink with parents: {parents:?}");
431                for parent in &parents {
432                    let pid = parent.index() as NodeId;
433                    let index_type = find_output_index_type_from_nodeid(pid, plan);
434                    if let Some(index_type) = index_type {
435                        #[cfg(feature = "macro_debug")]
436                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
437                        input_msg_indices_types.push(index_type);
438                    } else {
439                        #[cfg(feature = "macro_debug")]
440                        eprintln!("      ✗ Input from {pid} not ready, returning");
441                        return (next_culist_output_index, handled);
442                    }
443                }
444                output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
445                next_culist_output_index += 1;
446            }
447            CuTaskType::Regular => {
448                let parents: Vec<NodeIndex> =
449                    graph.0.neighbors_directed(id.into(), Incoming).collect();
450                #[cfg(feature = "macro_debug")]
451                eprintln!("    → Regular task with parents: {parents:?}");
452                for parent in &parents {
453                    let pid = parent.index() as NodeId;
454                    let index_type = find_output_index_type_from_nodeid(pid, plan);
455                    if let Some(index_type) = index_type {
456                        #[cfg(feature = "macro_debug")]
457                        eprintln!("      ✓ Input from {pid} ready: {index_type:?}");
458                        input_msg_indices_types.push(index_type);
459                    } else {
460                        #[cfg(feature = "macro_debug")]
461                        eprintln!("      ✗ Input from {pid} not ready, returning");
462                        return (next_culist_output_index, handled);
463                    }
464                }
465                output_msg_index_type = Some((
466                    next_culist_output_index,
467                    graph
468                        .0
469                        .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) // FIXME(gbin): Error handling and multimission
470                        .unwrap()
471                        .msg
472                        .clone(),
473                ));
474                next_culist_output_index += 1;
475            }
476        }
477
478        sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
479
480        if let Some(pos) = plan
481            .iter()
482            .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
483        {
484            #[cfg(feature = "macro_debug")]
485            eprintln!("    → Already in plan, modifying existing step");
486            let mut step = plan.remove(pos);
487            if let CuExecutionUnit::Step(ref mut s) = step {
488                s.input_msg_indices_types = input_msg_indices_types;
489            }
490            plan.push(step);
491        } else {
492            #[cfg(feature = "macro_debug")]
493            eprintln!("    → New step added to plan");
494            let step = CuExecutionStep {
495                node_id: id,
496                node: node_ref.clone(),
497                task_type,
498                input_msg_indices_types,
499                output_msg_index_type,
500            };
501            plan.push(CuExecutionUnit::Step(step));
502        }
503
504        handled = true;
505    }
506
507    #[cfg(feature = "macro_debug")]
508    eprintln!("-- finished branch from node {starting_point} with handled={handled}");
509    (next_culist_output_index, handled)
510}
511
512/// This is the main heuristics to compute an execution plan at compilation time.
513/// TODO(gbin): Make that heuristic pluggable.
514pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
515    #[cfg(feature = "macro_debug")]
516    eprintln!("[runtime plan]");
517    let visited = graph.0.visit_map();
518    let mut plan = Vec::new();
519    let mut next_culist_output_index = 0u32;
520
521    let mut queue: std::collections::VecDeque<NodeId> = graph
522        .node_indices()
523        .iter()
524        .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
525        .map(|node| node.index() as NodeId)
526        .collect();
527
528    #[cfg(feature = "macro_debug")]
529    eprintln!("Initial source nodes: {queue:?}");
530
531    while let Some(start_node) = queue.pop_front() {
532        if visited.is_visited(&start_node) {
533            #[cfg(feature = "macro_debug")]
534            eprintln!("→ Skipping already visited source {start_node}");
535            continue;
536        }
537
538        #[cfg(feature = "macro_debug")]
539        eprintln!("→ Starting BFS from source {start_node}");
540        let mut bfs = Bfs::new(&graph.0, start_node.into());
541
542        while let Some(node_index) = bfs.next(&graph.0) {
543            let node_id = node_index.index() as NodeId;
544            let already_in_plan = plan
545                .iter()
546                .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
547            if already_in_plan {
548                #[cfg(feature = "macro_debug")]
549                eprintln!("    → Node {node_id} already planned, skipping");
550                continue;
551            }
552
553            #[cfg(feature = "macro_debug")]
554            eprintln!("    Planning from node {node_id}");
555            let (new_index, handled) =
556                plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
557            next_culist_output_index = new_index;
558
559            if !handled {
560                #[cfg(feature = "macro_debug")]
561                eprintln!("    ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
562                continue;
563            }
564
565            #[cfg(feature = "macro_debug")]
566            eprintln!("    ✓ Node {node_id} handled successfully, enqueueing neighbors");
567            for neighbor in graph.0.neighbors(node_index) {
568                if !visited.is_visited(&neighbor) {
569                    let nid = neighbor.index() as NodeId;
570                    #[cfg(feature = "macro_debug")]
571                    eprintln!("      → Enqueueing neighbor {nid}");
572                    queue.push_back(nid);
573                }
574            }
575        }
576    }
577
578    Ok(CuExecutionLoop {
579        steps: plan,
580        loop_count: None,
581    })
582}
583
584//tests
585#[cfg(test)]
586mod tests {
587    use super::*;
588    use crate::config::Node;
589    use crate::cutask::CuSinkTask;
590    use crate::cutask::{CuSrcTask, Freezable};
591    use crate::monitoring::NoMonitor;
592    use bincode::Encode;
593    use cu29_traits::{ErasedCuMsg, ErasedCuMsgs, MatchingTasks};
594    use serde_derive::Serialize;
595
596    pub struct TestSource {}
597
598    impl Freezable for TestSource {}
599
600    impl CuSrcTask<'_> for TestSource {
601        type Output = ();
602        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
603        where
604            Self: Sized,
605        {
606            Ok(Self {})
607        }
608
609        fn process(&mut self, _clock: &RobotClock, _empty_msg: Self::Output) -> CuResult<()> {
610            Ok(())
611        }
612    }
613
614    pub struct TestSink {}
615
616    impl Freezable for TestSink {}
617
618    impl CuSinkTask<'_> for TestSink {
619        type Input = ();
620
621        fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
622        where
623            Self: Sized,
624        {
625            Ok(Self {})
626        }
627
628        fn process(&mut self, _clock: &RobotClock, _input: Self::Input) -> CuResult<()> {
629            Ok(())
630        }
631    }
632
633    // Those should be generated by the derive macro
634    type Tasks = (TestSource, TestSink);
635
636    #[derive(Debug, Encode, Decode, Serialize)]
637    struct Msgs(());
638
639    impl ErasedCuMsgs for Msgs {
640        fn cumsgs(&self) -> Vec<&dyn ErasedCuMsg> {
641            Vec::new()
642        }
643    }
644
645    impl MatchingTasks for Msgs {
646        fn get_all_task_ids() -> &'static [&'static str] {
647            &[]
648        }
649    }
650
651    fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
652        Ok((
653            TestSource::new(all_instances_configs[0])?,
654            TestSink::new(all_instances_configs[1])?,
655        ))
656    }
657
658    fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
659        NoMonitor {}
660    }
661
662    #[derive(Debug)]
663    struct FakeWriter {}
664
665    impl<E: Encode> WriteStream<E> for FakeWriter {
666        fn log(&mut self, _obj: &E) -> CuResult<()> {
667            Ok(())
668        }
669    }
670
671    #[test]
672    fn test_runtime_instantiation() {
673        let mut config = CuConfig::default();
674        let graph = config.get_graph_mut(None).unwrap();
675        graph.add_node(Node::new("a", "TestSource")).unwrap();
676        graph.add_node(Node::new("b", "TestSink")).unwrap();
677        graph.connect(0, 1, "()").unwrap();
678        let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
679            RobotClock::default(),
680            &config,
681            None,
682            tasks_instanciator,
683            monitor_instanciator,
684            FakeWriter {},
685            FakeWriter {},
686        );
687        assert!(runtime.is_ok());
688    }
689
690    #[test]
691    fn test_copperlists_manager_lifecycle() {
692        let mut config = CuConfig::default();
693        let graph = config.get_graph_mut(None).unwrap();
694        graph.add_node(Node::new("a", "TestSource")).unwrap();
695        graph.add_node(Node::new("b", "TestSink")).unwrap();
696        graph.connect(0, 1, "()").unwrap();
697        let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
698            RobotClock::default(),
699            &config,
700            None,
701            tasks_instanciator,
702            monitor_instanciator,
703            FakeWriter {},
704            FakeWriter {},
705        )
706        .unwrap();
707
708        // Now emulates the generated runtime
709        {
710            let copperlists = &mut runtime.copperlists_manager;
711            let culist0 = copperlists
712                .inner
713                .create()
714                .expect("Ran out of space for copper lists");
715            // FIXME: error handling.
716            let id = culist0.id;
717            assert_eq!(id, 0);
718            culist0.change_state(CopperListState::Processing);
719            assert_eq!(copperlists.available_copper_lists(), 1);
720        }
721
722        {
723            let copperlists = &mut runtime.copperlists_manager;
724            let culist1 = copperlists
725                .inner
726                .create()
727                .expect("Ran out of space for copper lists"); // FIXME: error handling.
728            let id = culist1.id;
729            assert_eq!(id, 1);
730            culist1.change_state(CopperListState::Processing);
731            assert_eq!(copperlists.available_copper_lists(), 0);
732        }
733
734        {
735            let copperlists = &mut runtime.copperlists_manager;
736            let culist2 = copperlists.inner.create();
737            assert!(culist2.is_none());
738            assert_eq!(copperlists.available_copper_lists(), 0);
739            // Free in order, should let the top of the stack be serialized and freed.
740            let _ = copperlists.end_of_processing(1);
741            assert_eq!(copperlists.available_copper_lists(), 1);
742        }
743
744        // Readd a CL
745        {
746            let copperlists = &mut runtime.copperlists_manager;
747            let culist2 = copperlists
748                .inner
749                .create()
750                .expect("Ran out of space for copper lists"); // FIXME: error handling.
751            let id = culist2.id;
752            assert_eq!(id, 2);
753            culist2.change_state(CopperListState::Processing);
754            assert_eq!(copperlists.available_copper_lists(), 0);
755            // Free out of order, the #0 first
756            let _ = copperlists.end_of_processing(0);
757            // Should not free up the top of the stack
758            assert_eq!(copperlists.available_copper_lists(), 0);
759
760            // Free up the top of the stack
761            let _ = copperlists.end_of_processing(2);
762            // This should free up 2 CLs
763
764            assert_eq!(copperlists.available_copper_lists(), 2);
765        }
766    }
767
768    #[test]
769    fn test_runtime_task_input_order() {
770        let mut config = CuConfig::default();
771        let graph = config.get_graph_mut(None).unwrap();
772        let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
773        let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
774        let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
775
776        assert_eq!(src1_id, 0);
777        assert_eq!(src2_id, 1);
778
779        // note that the source2 connection is before the source1
780        let src1_type = "src1_type";
781        let src2_type = "src2_type";
782        graph.connect(src2_id, sink_id, src2_type).unwrap();
783        graph.connect(src1_id, sink_id, src1_type).unwrap();
784
785        let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
786        let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
787        // the edge id depends on the order the connection is created, not
788        // on the node id, and that is what determines the input order
789        assert_eq!(src1_edge_id, 1);
790        assert_eq!(src2_edge_id, 0);
791
792        let runtime = compute_runtime_plan(graph).unwrap();
793        let sink_step = runtime
794            .steps
795            .iter()
796            .find_map(|step| match step {
797                CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
798                _ => None,
799            })
800            .unwrap();
801
802        // since the src2 connection was added before src1 connection, the src2 type should be
803        // first
804        assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
805        assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
806    }
807
808    #[test]
809    fn test_runtime_plan_diamond_case1() {
810        // more complex topology that tripped the scheduler
811        let mut config = CuConfig::default();
812        let graph = config.get_graph_mut(None).unwrap();
813        let cam0_id = graph
814            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
815            .unwrap();
816        let inf0_id = graph
817            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
818            .unwrap();
819        let broadcast_id = graph
820            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
821            .unwrap();
822
823        // case 1 order
824        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
825        graph.connect(cam0_id, inf0_id, "i32").unwrap();
826        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
827
828        let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
829        let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
830
831        assert_eq!(edge_cam0_to_inf0, 0);
832        assert_eq!(edge_cam0_to_broadcast, 1);
833
834        let runtime = compute_runtime_plan(graph).unwrap();
835        let broadcast_step = runtime
836            .steps
837            .iter()
838            .find_map(|step| match step {
839                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
840                _ => None,
841            })
842            .unwrap();
843
844        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
845        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
846    }
847
848    #[test]
849    fn test_runtime_plan_diamond_case2() {
850        // more complex topology that tripped the scheduler variation 2
851        let mut config = CuConfig::default();
852        let graph = config.get_graph_mut(None).unwrap();
853        let cam0_id = graph
854            .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
855            .unwrap();
856        let inf0_id = graph
857            .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
858            .unwrap();
859        let broadcast_id = graph
860            .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
861            .unwrap();
862
863        // case 2 order
864        graph.connect(cam0_id, inf0_id, "i32").unwrap();
865        graph.connect(cam0_id, broadcast_id, "i32").unwrap();
866        graph.connect(inf0_id, broadcast_id, "f32").unwrap();
867
868        let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
869        let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
870
871        assert_eq!(edge_cam0_to_broadcast, 0);
872        assert_eq!(edge_cam0_to_inf0, 1);
873
874        let runtime = compute_runtime_plan(graph).unwrap();
875        let broadcast_step = runtime
876            .steps
877            .iter()
878            .find_map(|step| match step {
879                CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
880                _ => None,
881            })
882            .unwrap();
883
884        assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
885        assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
886    }
887}