1use crate::config::{ComponentConfig, Node, DEFAULT_KEYFRAME_INTERVAL};
6use crate::config::{CuConfig, CuGraph, NodeId, RuntimeConfig};
7use crate::copperlist::{CopperList, CopperListState, CuListZeroedInit, CuListsManager};
8use crate::cutask::{BincodeAdapter, Freezable};
9use crate::monitoring::CuMonitor;
10use cu29_clock::{ClockProvider, CuTime, RobotClock};
11use cu29_traits::CuResult;
12use cu29_traits::WriteStream;
13use cu29_traits::{CopperListTuple, CuError};
14
15use bincode::enc::write::{SizeWriter, SliceWriter};
16use bincode::enc::EncoderImpl;
17use bincode::error::EncodeError;
18use bincode::{Decode, Encode};
19use core::fmt::{Debug, Formatter};
20use petgraph::prelude::*;
21use petgraph::visit::VisitMap;
22use petgraph::visit::Visitable;
23
24#[cfg(not(feature = "std"))]
25mod imp {
26 pub use alloc::boxed::Box;
27 pub use alloc::collections::VecDeque;
28 pub use alloc::format;
29 pub use alloc::string::String;
30 pub use alloc::string::ToString;
31 pub use alloc::vec::Vec;
32 pub use core::fmt::Result as FmtResult;
33}
34
35#[cfg(feature = "std")]
36mod imp {
37 pub use cu29_log_runtime::LoggerRuntime;
38 pub use cu29_unifiedlog::UnifiedLoggerWrite;
39 pub use rayon::ThreadPool;
40 pub use std::collections::VecDeque;
41 pub use std::fmt::Result as FmtResult;
42 pub use std::sync::{Arc, Mutex};
43}
44
45use imp::*;
46
47#[cfg(feature = "std")]
49pub struct CopperContext {
50 pub unified_logger: Arc<Mutex<UnifiedLoggerWrite>>,
51 pub logger_runtime: LoggerRuntime,
52 pub clock: RobotClock,
53}
54
55pub struct CopperListsManager<P: CopperListTuple + Default, const NBCL: usize> {
57 pub inner: CuListsManager<P, NBCL>,
58 pub logger: Option<Box<dyn WriteStream<CopperList<P>>>>,
60}
61
62impl<P: CopperListTuple + Default, const NBCL: usize> CopperListsManager<P, NBCL> {
63 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
64 let mut is_top = true;
65 let mut nb_done = 0;
66 for cl in self.inner.iter_mut() {
67 if cl.id == culistid && cl.get_state() == CopperListState::Processing {
68 cl.change_state(CopperListState::DoneProcessing);
69 }
70 if is_top && cl.get_state() == CopperListState::DoneProcessing {
71 if let Some(logger) = &mut self.logger {
72 cl.change_state(CopperListState::BeingSerialized);
73 logger.log(cl)?;
74 }
75 cl.change_state(CopperListState::Free);
76 nb_done += 1;
77 } else {
78 is_top = false;
79 }
80 }
81 for _ in 0..nb_done {
82 let _ = self.inner.pop();
83 }
84 Ok(())
85 }
86
87 pub fn available_copper_lists(&self) -> usize {
88 NBCL - self.inner.len()
89 }
90}
91
92pub struct KeyFramesManager {
94 inner: KeyFrame,
96
97 logger: Option<Box<dyn WriteStream<KeyFrame>>>,
99
100 keyframe_interval: u32,
102}
103
104impl KeyFramesManager {
105 fn is_keyframe(&self, culistid: u32) -> bool {
106 self.logger.is_some() && culistid.is_multiple_of(self.keyframe_interval)
107 }
108
109 pub fn reset(&mut self, culistid: u32, clock: &RobotClock) {
110 if self.is_keyframe(culistid) {
111 self.inner.reset(culistid, clock.now());
112 }
113 }
114
115 pub fn freeze_task(&mut self, culistid: u32, task: &impl Freezable) -> CuResult<usize> {
116 if self.is_keyframe(culistid) {
117 if self.inner.culistid != culistid {
118 panic!("Freezing task for a different culistid");
119 }
120 self.inner
121 .add_frozen_task(task)
122 .map_err(|e| CuError::from(format!("Failed to serialize task: {e}")))
123 } else {
124 Ok(0)
125 }
126 }
127
128 pub fn end_of_processing(&mut self, culistid: u32) -> CuResult<()> {
129 if self.is_keyframe(culistid) {
130 let logger = self.logger.as_mut().unwrap();
131 logger.log(&self.inner)
132 } else {
133 Ok(())
134 }
135 }
136}
137
138pub struct CuRuntime<CT, P: CopperListTuple, M: CuMonitor, const NBCL: usize> {
142 pub clock: RobotClock, pub tasks: CT,
147
148 #[cfg(feature = "std")]
150 pub threadpool: Arc<ThreadPool>,
151
152 pub monitor: M,
154
155 pub copperlists_manager: CopperListsManager<P, NBCL>,
157
158 pub keyframes_manager: KeyFramesManager,
160
161 pub runtime_config: RuntimeConfig,
163}
164
165impl<CT, P: CopperListTuple + CuListZeroedInit + Default, M: CuMonitor, const NBCL: usize>
167 ClockProvider for CuRuntime<CT, P, M, NBCL>
168{
169 fn get_clock(&self) -> RobotClock {
170 self.clock.clone()
171 }
172}
173
174#[derive(Encode, Decode)]
178pub struct KeyFrame {
179 pub culistid: u32,
181 pub timestamp: CuTime,
183 pub serialized_tasks: Vec<u8>,
185}
186
187impl KeyFrame {
188 fn new() -> Self {
189 KeyFrame {
190 culistid: 0,
191 timestamp: CuTime::default(),
192 serialized_tasks: Vec::new(),
193 }
194 }
195
196 fn reset(&mut self, culistid: u32, timestamp: CuTime) {
198 self.culistid = culistid;
199 self.timestamp = timestamp;
200 self.serialized_tasks.clear();
201 }
202
203 fn add_frozen_task(&mut self, task: &impl Freezable) -> Result<usize, EncodeError> {
205 let cfg = bincode::config::standard();
206 let mut sizer = EncoderImpl::<_, _>::new(SizeWriter::default(), cfg);
207 BincodeAdapter(task).encode(&mut sizer)?;
208 let need = sizer.into_writer().bytes_written as usize;
209
210 let start = self.serialized_tasks.len();
211 self.serialized_tasks.resize(start + need, 0);
212 let mut enc =
213 EncoderImpl::<_, _>::new(SliceWriter::new(&mut self.serialized_tasks[start..]), cfg);
214 BincodeAdapter(task).encode(&mut enc)?;
215 Ok(need)
216 }
217}
218
219impl<
220 CT,
221 P: CopperListTuple + CuListZeroedInit + Default + 'static,
222 M: CuMonitor,
223 const NBCL: usize,
224 > CuRuntime<CT, P, M, NBCL>
225{
226 #[cfg(feature = "std")]
228 pub fn new(
229 clock: RobotClock,
230 config: &CuConfig,
231 mission: Option<&str>,
232 tasks_instanciator: impl for<'c> Fn(
233 Vec<Option<&'c ComponentConfig>>,
234 Arc<ThreadPool>,
235 ) -> CuResult<CT>,
236 monitor_instanciator: impl Fn(&CuConfig) -> M,
237 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
238 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
239 ) -> CuResult<Self> {
240 let graph = config.get_graph(mission)?;
241 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
242 .get_all_nodes()
243 .iter()
244 .map(|(_, node)| node.get_instance_config())
245 .collect();
246
247 let threadpool = Arc::new(
250 rayon::ThreadPoolBuilder::new()
251 .num_threads(2) .build()
253 .expect("Could not create the threadpool"),
254 );
255
256 let tasks = tasks_instanciator(all_instances_configs, threadpool.clone())?;
257 let monitor = monitor_instanciator(config);
258
259 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
260 Some(logging_config) if logging_config.enable_task_logging => (
261 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
262 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
263 logging_config.keyframe_interval.unwrap(), ),
265 Some(_) => (None, None, 0), None => (
267 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
269 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
270 DEFAULT_KEYFRAME_INTERVAL,
271 ),
272 };
273
274 let copperlists_manager = CopperListsManager {
275 inner: CuListsManager::new(),
276 logger: copperlists_logger,
277 };
278
279 let keyframes_manager = KeyFramesManager {
280 inner: KeyFrame::new(),
281 logger: keyframes_logger,
282 keyframe_interval,
283 };
284
285 let runtime_config = config.runtime.clone().unwrap_or_default();
286
287 let runtime = Self {
288 tasks,
289 threadpool,
290 monitor,
291 clock,
292 copperlists_manager,
293 keyframes_manager,
294 runtime_config,
295 };
296
297 Ok(runtime)
298 }
299
300 #[cfg(not(feature = "std"))]
301 pub fn new(
302 clock: RobotClock,
303 config: &CuConfig,
304 mission: Option<&str>,
305 tasks_instanciator: impl for<'c> Fn(Vec<Option<&'c ComponentConfig>>) -> CuResult<CT>,
306 monitor_instanciator: impl Fn(&CuConfig) -> M,
307 copperlists_logger: impl WriteStream<CopperList<P>> + 'static,
308 keyframes_logger: impl WriteStream<KeyFrame> + 'static,
309 ) -> CuResult<Self> {
310 let graph = config.get_graph(mission)?;
311 let all_instances_configs: Vec<Option<&ComponentConfig>> = graph
312 .get_all_nodes()
313 .iter()
314 .map(|(_, node)| node.get_instance_config())
315 .collect();
316
317 let tasks = tasks_instanciator(all_instances_configs)?;
318
319 let monitor = monitor_instanciator(config);
320
321 let (copperlists_logger, keyframes_logger, keyframe_interval) = match &config.logging {
322 Some(logging_config) if logging_config.enable_task_logging => (
323 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
324 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
325 logging_config.keyframe_interval.unwrap(), ),
327 Some(_) => (None, None, 0), None => (
329 Some(Box::new(copperlists_logger) as Box<dyn WriteStream<CopperList<P>>>),
331 Some(Box::new(keyframes_logger) as Box<dyn WriteStream<KeyFrame>>),
332 DEFAULT_KEYFRAME_INTERVAL,
333 ),
334 };
335
336 let copperlists_manager = CopperListsManager {
337 inner: CuListsManager::new(),
338 logger: copperlists_logger,
339 };
340
341 let keyframes_manager = KeyFramesManager {
342 inner: KeyFrame::new(),
343 logger: keyframes_logger,
344 keyframe_interval,
345 };
346
347 let runtime_config = config.runtime.clone().unwrap_or_default();
348
349 let runtime = Self {
350 tasks,
351 #[cfg(feature = "std")]
352 threadpool,
353 monitor,
354 clock,
355 copperlists_manager,
356 keyframes_manager,
357 runtime_config,
358 };
359
360 Ok(runtime)
361 }
362}
363
364#[derive(Debug, PartialEq, Eq, Clone, Copy)]
369pub enum CuTaskType {
370 Source,
371 Regular,
372 Sink,
373}
374
375pub struct CuExecutionStep {
377 pub node_id: NodeId,
379 pub node: Node,
381 pub task_type: CuTaskType,
383
384 pub input_msg_indices_types: Vec<(u32, String)>,
386
387 pub output_msg_index_type: Option<(u32, String)>,
389}
390
391impl Debug for CuExecutionStep {
392 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
393 f.write_str(format!(" CuExecutionStep: Node Id: {}\n", self.node_id).as_str())?;
394 f.write_str(format!(" task_type: {:?}\n", self.node.get_type()).as_str())?;
395 f.write_str(format!(" task: {:?}\n", self.task_type).as_str())?;
396 f.write_str(
397 format!(
398 " input_msg_types: {:?}\n",
399 self.input_msg_indices_types
400 )
401 .as_str(),
402 )?;
403 f.write_str(
404 format!(" output_msg_type: {:?}\n", self.output_msg_index_type).as_str(),
405 )?;
406 Ok(())
407 }
408}
409
410pub struct CuExecutionLoop {
415 pub steps: Vec<CuExecutionUnit>,
416 pub loop_count: Option<u32>,
417}
418
419impl Debug for CuExecutionLoop {
420 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
421 f.write_str("CuExecutionLoop:\n")?;
422 for step in &self.steps {
423 match step {
424 CuExecutionUnit::Step(step) => {
425 step.fmt(f)?;
426 }
427 CuExecutionUnit::Loop(l) => {
428 l.fmt(f)?;
429 }
430 }
431 }
432
433 f.write_str(format!(" count: {:?}", self.loop_count).as_str())?;
434 Ok(())
435 }
436}
437
438#[derive(Debug)]
440pub enum CuExecutionUnit {
441 Step(CuExecutionStep),
442 Loop(CuExecutionLoop),
443}
444
445fn find_output_index_type_from_nodeid(
446 node_id: NodeId,
447 steps: &Vec<CuExecutionUnit>,
448) -> Option<(u32, String)> {
449 for step in steps {
450 match step {
451 CuExecutionUnit::Loop(loop_unit) => {
452 if let Some(index) = find_output_index_type_from_nodeid(node_id, &loop_unit.steps) {
453 return Some(index);
454 }
455 }
456 CuExecutionUnit::Step(step) => {
457 if step.node_id == node_id {
458 return step.output_msg_index_type.clone();
459 }
460 }
461 }
462 }
463 None
464}
465
466pub fn find_task_type_for_id(graph: &CuGraph, node_id: NodeId) -> CuTaskType {
467 if graph.0.neighbors_directed(node_id.into(), Incoming).count() == 0 {
468 CuTaskType::Source
469 } else if graph.0.neighbors_directed(node_id.into(), Outgoing).count() == 0 {
470 CuTaskType::Sink
471 } else {
472 CuTaskType::Regular
473 }
474}
475
476fn find_edge_with_plan_input_id(
479 plan: &[CuExecutionUnit],
480 graph: &CuGraph,
481 plan_id: u32,
482 output_node_id: NodeId,
483) -> usize {
484 let input_node = plan
485 .get(plan_id as usize)
486 .expect("Input step should've been added to plan before the step that receives the input");
487 let CuExecutionUnit::Step(input_step) = input_node else {
488 panic!("Expected input to be from a step, not a loop");
489 };
490 let input_node_id = input_step.node_id;
491
492 graph
493 .0
494 .edges_connecting(input_node_id.into(), output_node_id.into())
495 .map(|edge| edge.id().index())
496 .next()
497 .expect("An edge connecting the input to the output should exist")
498}
499
500fn sort_inputs_by_cnx_id(
503 input_msg_indices_types: &mut [(u32, String)],
504 plan: &[CuExecutionUnit],
505 graph: &CuGraph,
506 curr_node_id: NodeId,
507) {
508 input_msg_indices_types.sort_by(|(a_index, _), (b_index, _)| {
509 let a_edge_id = find_edge_with_plan_input_id(plan, graph, *a_index, curr_node_id);
510 let b_edge_id = find_edge_with_plan_input_id(plan, graph, *b_index, curr_node_id);
511 a_edge_id.cmp(&b_edge_id)
512 });
513}
514fn plan_tasks_tree_branch(
516 graph: &CuGraph,
517 mut next_culist_output_index: u32,
518 starting_point: NodeId,
519 plan: &mut Vec<CuExecutionUnit>,
520) -> (u32, bool) {
521 #[cfg(all(feature = "std", feature = "macro_debug"))]
522 eprintln!("-- starting branch from node {starting_point}");
523
524 let mut visitor = Bfs::new(&graph.0, starting_point.into());
525 let mut handled = false;
526
527 while let Some(node) = visitor.next(&graph.0) {
528 let id = node.index() as NodeId;
529 let node_ref = graph.get_node(id).unwrap();
530 #[cfg(all(feature = "std", feature = "macro_debug"))]
531 eprintln!(" Visiting node: {node_ref:?}");
532
533 let mut input_msg_indices_types: Vec<(u32, String)> = Vec::new();
534 let output_msg_index_type: Option<(u32, String)>;
535 let task_type = find_task_type_for_id(graph, id);
536
537 match task_type {
538 CuTaskType::Source => {
539 #[cfg(all(feature = "std", feature = "macro_debug"))]
540 eprintln!(" → Source node, assign output index {next_culist_output_index}");
541 output_msg_index_type = Some((
542 next_culist_output_index,
543 graph
544 .0
545 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0]))
546 .unwrap() .msg
548 .clone(),
549 ));
550 next_culist_output_index += 1;
551 }
552 CuTaskType::Sink => {
553 let parents: Vec<NodeIndex> =
554 graph.0.neighbors_directed(id.into(), Incoming).collect();
555 #[cfg(all(feature = "std", feature = "macro_debug"))]
556 eprintln!(" → Sink with parents: {parents:?}");
557 for parent in &parents {
558 let pid = parent.index() as NodeId;
559 let index_type = find_output_index_type_from_nodeid(pid, plan);
560 if let Some(index_type) = index_type {
561 #[cfg(all(feature = "std", feature = "macro_debug"))]
562 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
563 input_msg_indices_types.push(index_type);
564 } else {
565 #[cfg(all(feature = "std", feature = "macro_debug"))]
566 eprintln!(" ✗ Input from {pid} not ready, returning");
567 return (next_culist_output_index, handled);
568 }
569 }
570 output_msg_index_type = Some((next_culist_output_index, "()".to_string()));
571 next_culist_output_index += 1;
572 }
573 CuTaskType::Regular => {
574 let parents: Vec<NodeIndex> =
575 graph.0.neighbors_directed(id.into(), Incoming).collect();
576 #[cfg(all(feature = "std", feature = "macro_debug"))]
577 eprintln!(" → Regular task with parents: {parents:?}");
578 for parent in &parents {
579 let pid = parent.index() as NodeId;
580 let index_type = find_output_index_type_from_nodeid(pid, plan);
581 if let Some(index_type) = index_type {
582 #[cfg(all(feature = "std", feature = "macro_debug"))]
583 eprintln!(" ✓ Input from {pid} ready: {index_type:?}");
584 input_msg_indices_types.push(index_type);
585 } else {
586 #[cfg(all(feature = "std", feature = "macro_debug"))]
587 eprintln!(" ✗ Input from {pid} not ready, returning");
588 return (next_culist_output_index, handled);
589 }
590 }
591 output_msg_index_type = Some((
592 next_culist_output_index,
593 graph
594 .0
595 .edge_weight(EdgeIndex::new(graph.get_src_edges(id).unwrap()[0])) .unwrap()
597 .msg
598 .clone(),
599 ));
600 next_culist_output_index += 1;
601 }
602 }
603
604 sort_inputs_by_cnx_id(&mut input_msg_indices_types, plan, graph, id);
605
606 if let Some(pos) = plan
607 .iter()
608 .position(|step| matches!(step, CuExecutionUnit::Step(s) if s.node_id == id))
609 {
610 #[cfg(all(feature = "std", feature = "macro_debug"))]
611 eprintln!(" → Already in plan, modifying existing step");
612 let mut step = plan.remove(pos);
613 if let CuExecutionUnit::Step(ref mut s) = step {
614 s.input_msg_indices_types = input_msg_indices_types;
615 }
616 plan.push(step);
617 } else {
618 #[cfg(all(feature = "std", feature = "macro_debug"))]
619 eprintln!(" → New step added to plan");
620 let step = CuExecutionStep {
621 node_id: id,
622 node: node_ref.clone(),
623 task_type,
624 input_msg_indices_types,
625 output_msg_index_type,
626 };
627 plan.push(CuExecutionUnit::Step(step));
628 }
629
630 handled = true;
631 }
632
633 #[cfg(all(feature = "std", feature = "macro_debug"))]
634 eprintln!("-- finished branch from node {starting_point} with handled={handled}");
635 (next_culist_output_index, handled)
636}
637
638pub fn compute_runtime_plan(graph: &CuGraph) -> CuResult<CuExecutionLoop> {
641 #[cfg(all(feature = "std", feature = "macro_debug"))]
642 eprintln!("[runtime plan]");
643 let visited = graph.0.visit_map();
644 let mut plan = Vec::new();
645 let mut next_culist_output_index = 0u32;
646
647 let mut queue: VecDeque<NodeId> = graph
648 .node_indices()
649 .iter()
650 .filter(|&node| find_task_type_for_id(graph, node.index() as NodeId) == CuTaskType::Source)
651 .map(|node| node.index() as NodeId)
652 .collect();
653
654 #[cfg(all(feature = "std", feature = "macro_debug"))]
655 eprintln!("Initial source nodes: {queue:?}");
656
657 while let Some(start_node) = queue.pop_front() {
658 if visited.is_visited(&start_node) {
659 #[cfg(all(feature = "std", feature = "macro_debug"))]
660 eprintln!("→ Skipping already visited source {start_node}");
661 continue;
662 }
663
664 #[cfg(all(feature = "std", feature = "macro_debug"))]
665 eprintln!("→ Starting BFS from source {start_node}");
666 let mut bfs = Bfs::new(&graph.0, start_node.into());
667
668 while let Some(node_index) = bfs.next(&graph.0) {
669 let node_id = node_index.index() as NodeId;
670 let already_in_plan = plan
671 .iter()
672 .any(|unit| matches!(unit, CuExecutionUnit::Step(s) if s.node_id == node_id));
673 if already_in_plan {
674 #[cfg(all(feature = "std", feature = "macro_debug"))]
675 eprintln!(" → Node {node_id} already planned, skipping");
676 continue;
677 }
678
679 #[cfg(all(feature = "std", feature = "macro_debug"))]
680 eprintln!(" Planning from node {node_id}");
681 let (new_index, handled) =
682 plan_tasks_tree_branch(graph, next_culist_output_index, node_id, &mut plan);
683 next_culist_output_index = new_index;
684
685 if !handled {
686 #[cfg(all(feature = "std", feature = "macro_debug"))]
687 eprintln!(" ✗ Node {node_id} was not handled, skipping enqueue of neighbors");
688 continue;
689 }
690
691 #[cfg(all(feature = "std", feature = "macro_debug"))]
692 eprintln!(" ✓ Node {node_id} handled successfully, enqueueing neighbors");
693 for neighbor in graph.0.neighbors(node_index) {
694 if !visited.is_visited(&neighbor) {
695 let nid = neighbor.index() as NodeId;
696 #[cfg(all(feature = "std", feature = "macro_debug"))]
697 eprintln!(" → Enqueueing neighbor {nid}");
698 queue.push_back(nid);
699 }
700 }
701 }
702 }
703
704 Ok(CuExecutionLoop {
705 steps: plan,
706 loop_count: None,
707 })
708}
709
710#[cfg(test)]
712mod tests {
713 use super::*;
714 use crate::config::Node;
715 use crate::cutask::CuSinkTask;
716 use crate::cutask::{CuSrcTask, Freezable};
717 use crate::monitoring::NoMonitor;
718 use bincode::Encode;
719 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks};
720 use serde_derive::Serialize;
721
722 pub struct TestSource {}
723
724 impl Freezable for TestSource {}
725
726 impl CuSrcTask for TestSource {
727 type Output<'m> = ();
728 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
729 where
730 Self: Sized,
731 {
732 Ok(Self {})
733 }
734
735 fn process(
736 &mut self,
737 _clock: &RobotClock,
738 _empty_msg: &mut Self::Output<'_>,
739 ) -> CuResult<()> {
740 Ok(())
741 }
742 }
743
744 pub struct TestSink {}
745
746 impl Freezable for TestSink {}
747
748 impl CuSinkTask for TestSink {
749 type Input<'m> = ();
750
751 fn new(_config: Option<&ComponentConfig>) -> CuResult<Self>
752 where
753 Self: Sized,
754 {
755 Ok(Self {})
756 }
757
758 fn process(&mut self, _clock: &RobotClock, _input: &Self::Input<'_>) -> CuResult<()> {
759 Ok(())
760 }
761 }
762
763 type Tasks = (TestSource, TestSink);
765
766 #[derive(Debug, Encode, Decode, Serialize, Default)]
767 struct Msgs(());
768
769 impl ErasedCuStampedDataSet for Msgs {
770 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
771 Vec::new()
772 }
773 }
774
775 impl MatchingTasks for Msgs {
776 fn get_all_task_ids() -> &'static [&'static str] {
777 &[]
778 }
779 }
780
781 impl CuListZeroedInit for Msgs {
782 fn init_zeroed(&mut self) {}
783 }
784
785 #[cfg(feature = "std")]
786 fn tasks_instanciator(
787 all_instances_configs: Vec<Option<&ComponentConfig>>,
788 _threadpool: Arc<ThreadPool>,
789 ) -> CuResult<Tasks> {
790 Ok((
791 TestSource::new(all_instances_configs[0])?,
792 TestSink::new(all_instances_configs[1])?,
793 ))
794 }
795
796 #[cfg(not(feature = "std"))]
797 fn tasks_instanciator(all_instances_configs: Vec<Option<&ComponentConfig>>) -> CuResult<Tasks> {
798 Ok((
799 TestSource::new(all_instances_configs[0])?,
800 TestSink::new(all_instances_configs[1])?,
801 ))
802 }
803
804 fn monitor_instanciator(_config: &CuConfig) -> NoMonitor {
805 NoMonitor {}
806 }
807
808 #[derive(Debug)]
809 struct FakeWriter {}
810
811 impl<E: Encode> WriteStream<E> for FakeWriter {
812 fn log(&mut self, _obj: &E) -> CuResult<()> {
813 Ok(())
814 }
815 }
816
817 #[test]
818 fn test_runtime_instantiation() {
819 let mut config = CuConfig::default();
820 let graph = config.get_graph_mut(None).unwrap();
821 graph.add_node(Node::new("a", "TestSource")).unwrap();
822 graph.add_node(Node::new("b", "TestSink")).unwrap();
823 graph.connect(0, 1, "()").unwrap();
824 let runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
825 RobotClock::default(),
826 &config,
827 None,
828 tasks_instanciator,
829 monitor_instanciator,
830 FakeWriter {},
831 FakeWriter {},
832 );
833 assert!(runtime.is_ok());
834 }
835
836 #[test]
837 fn test_copperlists_manager_lifecycle() {
838 let mut config = CuConfig::default();
839 let graph = config.get_graph_mut(None).unwrap();
840 graph.add_node(Node::new("a", "TestSource")).unwrap();
841 graph.add_node(Node::new("b", "TestSink")).unwrap();
842 graph.connect(0, 1, "()").unwrap();
843
844 let mut runtime = CuRuntime::<Tasks, Msgs, NoMonitor, 2>::new(
845 RobotClock::default(),
846 &config,
847 None,
848 tasks_instanciator,
849 monitor_instanciator,
850 FakeWriter {},
851 FakeWriter {},
852 )
853 .unwrap();
854
855 {
857 let copperlists = &mut runtime.copperlists_manager;
858 let culist0 = copperlists
859 .inner
860 .create()
861 .expect("Ran out of space for copper lists");
862 let id = culist0.id;
864 assert_eq!(id, 0);
865 culist0.change_state(CopperListState::Processing);
866 assert_eq!(copperlists.available_copper_lists(), 1);
867 }
868
869 {
870 let copperlists = &mut runtime.copperlists_manager;
871 let culist1 = copperlists
872 .inner
873 .create()
874 .expect("Ran out of space for copper lists"); let id = culist1.id;
876 assert_eq!(id, 1);
877 culist1.change_state(CopperListState::Processing);
878 assert_eq!(copperlists.available_copper_lists(), 0);
879 }
880
881 {
882 let copperlists = &mut runtime.copperlists_manager;
883 let culist2 = copperlists.inner.create();
884 assert!(culist2.is_none());
885 assert_eq!(copperlists.available_copper_lists(), 0);
886 let _ = copperlists.end_of_processing(1);
888 assert_eq!(copperlists.available_copper_lists(), 1);
889 }
890
891 {
893 let copperlists = &mut runtime.copperlists_manager;
894 let culist2 = copperlists
895 .inner
896 .create()
897 .expect("Ran out of space for copper lists"); let id = culist2.id;
899 assert_eq!(id, 2);
900 culist2.change_state(CopperListState::Processing);
901 assert_eq!(copperlists.available_copper_lists(), 0);
902 let _ = copperlists.end_of_processing(0);
904 assert_eq!(copperlists.available_copper_lists(), 0);
906
907 let _ = copperlists.end_of_processing(2);
909 assert_eq!(copperlists.available_copper_lists(), 2);
912 }
913 }
914
915 #[test]
916 fn test_runtime_task_input_order() {
917 let mut config = CuConfig::default();
918 let graph = config.get_graph_mut(None).unwrap();
919 let src1_id = graph.add_node(Node::new("a", "Source1")).unwrap();
920 let src2_id = graph.add_node(Node::new("b", "Source2")).unwrap();
921 let sink_id = graph.add_node(Node::new("c", "Sink")).unwrap();
922
923 assert_eq!(src1_id, 0);
924 assert_eq!(src2_id, 1);
925
926 let src1_type = "src1_type";
928 let src2_type = "src2_type";
929 graph.connect(src2_id, sink_id, src2_type).unwrap();
930 graph.connect(src1_id, sink_id, src1_type).unwrap();
931
932 let src1_edge_id = *graph.get_src_edges(src1_id).unwrap().first().unwrap();
933 let src2_edge_id = *graph.get_src_edges(src2_id).unwrap().first().unwrap();
934 assert_eq!(src1_edge_id, 1);
937 assert_eq!(src2_edge_id, 0);
938
939 let runtime = compute_runtime_plan(graph).unwrap();
940 let sink_step = runtime
941 .steps
942 .iter()
943 .find_map(|step| match step {
944 CuExecutionUnit::Step(step) if step.node_id == sink_id => Some(step),
945 _ => None,
946 })
947 .unwrap();
948
949 assert_eq!(sink_step.input_msg_indices_types[0].1, src2_type);
952 assert_eq!(sink_step.input_msg_indices_types[1].1, src1_type);
953 }
954
955 #[test]
956 fn test_runtime_plan_diamond_case1() {
957 let mut config = CuConfig::default();
959 let graph = config.get_graph_mut(None).unwrap();
960 let cam0_id = graph
961 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
962 .unwrap();
963 let inf0_id = graph
964 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
965 .unwrap();
966 let broadcast_id = graph
967 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
968 .unwrap();
969
970 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
972 graph.connect(cam0_id, inf0_id, "i32").unwrap();
973 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
974
975 let edge_cam0_to_broadcast = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
976 let edge_cam0_to_inf0 = graph.get_src_edges(cam0_id).unwrap()[1];
977
978 assert_eq!(edge_cam0_to_inf0, 0);
979 assert_eq!(edge_cam0_to_broadcast, 1);
980
981 let runtime = compute_runtime_plan(graph).unwrap();
982 let broadcast_step = runtime
983 .steps
984 .iter()
985 .find_map(|step| match step {
986 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
987 _ => None,
988 })
989 .unwrap();
990
991 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
992 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
993 }
994
995 #[test]
996 fn test_runtime_plan_diamond_case2() {
997 let mut config = CuConfig::default();
999 let graph = config.get_graph_mut(None).unwrap();
1000 let cam0_id = graph
1001 .add_node(Node::new("cam0", "tasks::IntegerSrcTask"))
1002 .unwrap();
1003 let inf0_id = graph
1004 .add_node(Node::new("inf0", "tasks::Integer2FloatTask"))
1005 .unwrap();
1006 let broadcast_id = graph
1007 .add_node(Node::new("broadcast", "tasks::MergingSinkTask"))
1008 .unwrap();
1009
1010 graph.connect(cam0_id, inf0_id, "i32").unwrap();
1012 graph.connect(cam0_id, broadcast_id, "i32").unwrap();
1013 graph.connect(inf0_id, broadcast_id, "f32").unwrap();
1014
1015 let edge_cam0_to_inf0 = *graph.get_src_edges(cam0_id).unwrap().first().unwrap();
1016 let edge_cam0_to_broadcast = graph.get_src_edges(cam0_id).unwrap()[1];
1017
1018 assert_eq!(edge_cam0_to_broadcast, 0);
1019 assert_eq!(edge_cam0_to_inf0, 1);
1020
1021 let runtime = compute_runtime_plan(graph).unwrap();
1022 let broadcast_step = runtime
1023 .steps
1024 .iter()
1025 .find_map(|step| match step {
1026 CuExecutionUnit::Step(step) if step.node_id == broadcast_id => Some(step),
1027 _ => None,
1028 })
1029 .unwrap();
1030
1031 assert_eq!(broadcast_step.input_msg_indices_types[0].1, "i32");
1032 assert_eq!(broadcast_step.input_msg_indices_types[1].1, "f32");
1033 }
1034}